aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/guide.tex11
-rw-r--r--src/ejabberd_c2s.erl411
-rw-r--r--src/ejabberd_sm.erl15
-rw-r--r--src/jlib.erl16
4 files changed, 359 insertions, 94 deletions
diff --git a/doc/guide.tex b/doc/guide.tex
index 59f39e256..23752a27d 100644
--- a/doc/guide.tex
+++ b/doc/guide.tex
@@ -871,8 +871,8 @@ The available modules, their purpose and the options allowed by each one are:
Handles c2s connections.\\
Options: \texttt{access}, \texttt{certfile}, \texttt{ciphers},
\texttt{max\_ack\_queue}, \texttt{max\_fsm\_queue},
- \texttt{max\_stanza\_size}, \texttt{shaper},
- \texttt{starttls}, \texttt{starttls\_required},
+ \texttt{max\_stanza\_size}, \texttt{resume\_timeout},
+ \texttt{shaper}, \texttt{starttls}, \texttt{starttls\_required},
\texttt{stream\_management}, \texttt{tls},
\texttt{zlib}, \texttt{tls\_compression}
\titem{\texttt{ejabberd\_s2s\_in}}
@@ -1007,6 +1007,13 @@ request_handlers:
/"a"/"b": mod_foo
/"http-bind": mod_http_bind
\end{verbatim}
+ \titem{resume\_timeout: Seconds}
+ This option configures the number of seconds until a session times
+ out if the connection is lost. During this period of time, a client
+ may resume the session if \term{stream\_management} is enabled. This
+ option can be specified for \term{ejabberd\_c2s} listeners. Setting
+ it to \term{0} effectively disables session resumption. The default
+ value is \term{300}.
\titem{service\_check\_from: true|false}
\ind{options!service\_check\_from}
This option can be used with \term{ejabberd\_service} only.
diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl
index d89529d4e..fb3de4a6d 100644
--- a/src/ejabberd_c2s.erl
+++ b/src/ejabberd_c2s.erl
@@ -58,6 +58,7 @@
wait_for_bind/2,
wait_for_session/2,
wait_for_sasl_response/2,
+ wait_for_resume/2,
session_established/2,
handle_event/3,
handle_sync_event/4,
@@ -108,10 +109,13 @@
auth_module = unknown,
ip,
aux_fields = [],
+ sm_state,
sm_xmlns,
ack_queue,
max_ack_queue,
manage_stream = fun negotiate_stream_mgmt/2,
+ pending_since,
+ resume_timeout,
n_stanzas_in = 0,
n_stanzas_out = 0,
lang}).
@@ -166,6 +170,7 @@
-define(IS_STREAM_MGMT_TAG(Name),
Name == <<"enable">>;
+ Name == <<"resume">>;
Name == <<"a">>;
Name == <<"r">>).
@@ -183,6 +188,9 @@
-define(SM_BAD_REQUEST(Xmlns),
?SM_FAILED(<<"bad-request">>, Xmlns)).
+-define(SM_ITEM_NOT_FOUND(Xmlns),
+ ?SM_FAILED(<<"item-not-found">>, Xmlns)).
+
-define(SM_SERVICE_UNAVAILABLE(Xmlns),
?SM_FAILED(<<"service-unavailable">>, Xmlns)).
@@ -285,16 +293,18 @@ init([{SockMod, Socket}, Opts]) ->
true -> TLSOpts1
end,
TLSOpts = [verify_none | TLSOpts2],
+ StreamMgmtEnabled = proplists:get_value(stream_management, Opts, true),
+ StreamMgmtState = if StreamMgmtEnabled -> inactive;
+ true -> disabled
+ end,
MaxAckQueue = case proplists:get_value(max_ack_queue, Opts) of
Limit when is_integer(Limit), Limit > 0 -> Limit;
_ -> 500
end,
- StreamMgmtEnabled = proplists:get_value(stream_management, Opts, true),
- AckQueue = if not StreamMgmtEnabled ->
- none;
- true ->
- undefined
- end,
+ ResumeTimeout = case proplists:get_value(resume_timeout, Opts) of
+ Timeout when is_integer(Timeout), Timeout >= 0 -> Timeout;
+ _ -> 300
+ end,
IP = peerip(SockMod, Socket),
%% Check if IP is blacklisted:
case is_ip_blacklisted(IP) of
@@ -315,9 +325,11 @@ init([{SockMod, Socket}, Opts]) ->
xml_socket = XMLSocket, zlib = Zlib, tls = TLS,
tls_required = StartTLSRequired,
tls_enabled = TLSEnabled, tls_options = TLSOpts,
- streamid = new_id(), access = Access,
- shaper = Shaper, ip = IP,
- ack_queue = AckQueue, max_ack_queue = MaxAckQueue},
+ sid = {now(), self()}, streamid = new_id(),
+ access = Access, shaper = Shaper, ip = IP,
+ sm_state = StreamMgmtState,
+ max_ack_queue = MaxAckQueue,
+ resume_timeout = ResumeTimeout},
{ok, wait_for_stream, StateData, ?C2S_OPEN_TIMEOUT}
end.
@@ -603,7 +615,6 @@ wait_for_auth({xmlstreamelement, El}, StateData) ->
?INFO_MSG("(~w) Accepted legacy authentication for ~s by ~p",
[StateData#state.socket,
jlib:jid_to_string(JID), AuthModule]),
- SID = {now(), self()},
Conn = (StateData#state.sockmod):get_conn_type(
StateData#state.socket),
Info = [{ip, StateData#state.ip}, {conn, Conn},
@@ -611,7 +622,9 @@ wait_for_auth({xmlstreamelement, El}, StateData) ->
Res = jlib:make_result_iq_reply(
El#xmlel{children = []}),
send_element(StateData, Res),
- ejabberd_sm:open_session(SID, U, StateData#state.server, R, Info),
+ ejabberd_sm:open_session(StateData#state.sid, U,
+ StateData#state.server, R,
+ Info),
change_shaper(StateData, JID),
{Fs, Ts} =
ejabberd_hooks:run_fold(roster_get_subscription_lists,
@@ -629,7 +642,7 @@ wait_for_auth({xmlstreamelement, El}, StateData) ->
[U, StateData#state.server]),
NewStateData = StateData#state{user = U,
resource = R,
- jid = JID, sid = SID,
+ jid = JID,
conn = Conn,
auth_module = AuthModule,
pres_f = (?SETS):from_list(Fs1),
@@ -981,9 +994,20 @@ resource_conflict_action(U, S, R) ->
{accept_resource, Rnew}
end.
-wait_for_bind({xmlstreamelement, #xmlel{name = Name} = El}, StateData)
+wait_for_bind({xmlstreamelement, #xmlel{name = Name, attrs = Attrs} = El},
+ StateData)
when ?IS_STREAM_MGMT_TAG(Name) ->
- fsm_next_state(wait_for_bind, (StateData#state.manage_stream)(El, StateData));
+ case Name of
+ <<"resume">> ->
+ case handle_resume(StateData, Attrs) of
+ {ok, ResumedState} ->
+ fsm_next_state(session_established, ResumedState);
+ error ->
+ fsm_next_state(wait_for_bind, StateData)
+ end;
+ _ ->
+ fsm_next_state(wait_for_bind, (StateData#state.manage_stream)(El, StateData))
+ end;
wait_for_bind({xmlstreamelement, El}, StateData) ->
case jlib:iq_query_info(El) of
#iq{type = set, xmlns = ?NS_BIND, sub_el = SubEl} =
@@ -1077,15 +1101,13 @@ wait_for_session({xmlstreamelement, El}, StateData) ->
privacy_get_user_list, NewState#state.server,
#userlist{},
[U, NewState#state.server]),
- SID = {now(), self()},
Conn = get_conn_type(NewState),
Info = [{ip, NewState#state.ip}, {conn, Conn},
{auth_module, NewState#state.auth_module}],
ejabberd_sm:open_session(
- SID, U, NewState#state.server, R, Info),
+ NewState#state.sid, U, NewState#state.server, R, Info),
UpdatedStateData =
NewState#state{
- sid = SID,
conn = Conn,
pres_f = ?SETS:from_list(Fs1),
pres_t = ?SETS:from_list(Ts1),
@@ -1151,6 +1173,11 @@ session_established({xmlstreamerror, _}, StateData) ->
send_element(StateData, ?INVALID_XML_ERR),
send_trailer(StateData),
{stop, normal, StateData};
+session_established(closed, StateData)
+ when StateData#state.resume_timeout > 0,
+ StateData#state.sm_state == active orelse
+ StateData#state.sm_state == pending ->
+ fsm_next_state(wait_for_resume, StateData#state{sm_state = pending});
session_established(closed, StateData) ->
{stop, normal, StateData}.
@@ -1240,6 +1267,17 @@ session_established2(El, StateData) ->
[{xmlstreamelement, El}]),
fsm_next_state(session_established, NewState).
+wait_for_resume(timeout, StateData) ->
+ ?DEBUG("Timed out waiting for resumption of stream for ~s",
+ [jlib:jid_to_string(StateData#state.jid)]),
+ {stop, normal, StateData};
+wait_for_resume(closed, StateData) ->
+ ?DEBUG("Ignoring 'closed' event while waiting for resumption", []),
+ fsm_next_state(wait_for_resume, StateData);
+wait_for_resume(Event, StateData) ->
+ ?WARNING_MSG("Ignoring event while waiting for resumption: ~p", [Event]),
+ fsm_next_state(wait_for_resume, StateData).
+
%%----------------------------------------------------------------------
%% Func: StateName/3
%% Returns: {next_state, NextStateName, NextStateData} |
@@ -1284,6 +1322,15 @@ handle_sync_event(get_subscribed, _From, StateName,
StateData) ->
Subscribed = (?SETS):to_list(StateData#state.pres_f),
{reply, Subscribed, StateName, StateData};
+handle_sync_event(resume_session, _From, _StateName,
+ StateData) ->
+ %% The old session should be closed before the new one is opened, so we do
+ %% this here instead of leaving it to the terminate callback
+ ejabberd_sm:close_session(StateData#state.sid,
+ StateData#state.user,
+ StateData#state.server,
+ StateData#state.resource),
+ {stop, normal, {ok, StateData}, StateData#state{sm_state = resumed}};
handle_sync_event(_Event, _From, StateName,
StateData) ->
Reply = ok, fsm_reply(Reply, StateName, StateData).
@@ -1612,7 +1659,13 @@ handle_info({route, From, To,
handle_info({'DOWN', Monitor, _Type, _Object, _Info},
_StateName, StateData)
when Monitor == StateData#state.socket_monitor ->
- {stop, normal, StateData};
+ if StateData#state.resume_timeout > 0,
+ StateData#state.sm_state == active orelse
+ StateData#state.sm_state == pending ->
+ fsm_next_state(wait_for_resume, StateData#state{sm_state = pending});
+ true ->
+ {stop, normal, StateData}
+ end;
handle_info(system_shutdown, StateName, StateData) ->
case StateName of
wait_for_stream ->
@@ -1676,61 +1729,71 @@ print_state(State = #state{pres_t = T, pres_f = F, pres_a = A, pres_i = I}) ->
%% Returns: any
%%----------------------------------------------------------------------
terminate(_Reason, StateName, StateData) ->
- case StateName of
- session_established ->
- case StateData#state.authenticated of
- replaced ->
- ?INFO_MSG("(~w) Replaced session for ~s",
- [StateData#state.socket,
- jlib:jid_to_string(StateData#state.jid)]),
- From = StateData#state.jid,
- Packet = #xmlel{name = <<"presence">>,
- attrs = [{<<"type">>, <<"unavailable">>}],
- children =
- [#xmlel{name = <<"status">>, attrs = [],
- children =
- [{xmlcdata,
- <<"Replaced by new connection">>}]}]},
- ejabberd_sm:close_session_unset_presence(StateData#state.sid,
- StateData#state.user,
- StateData#state.server,
- StateData#state.resource,
- <<"Replaced by new connection">>),
- presence_broadcast(StateData, From,
- StateData#state.pres_a, Packet),
- presence_broadcast(StateData, From,
- StateData#state.pres_i, Packet);
- _ ->
- ?INFO_MSG("(~w) Close session for ~s",
- [StateData#state.socket,
- jlib:jid_to_string(StateData#state.jid)]),
- EmptySet = (?SETS):new(),
- case StateData of
- #state{pres_last = undefined, pres_a = EmptySet, pres_i = EmptySet, pres_invis = false} ->
- ejabberd_sm:close_session(StateData#state.sid,
- StateData#state.user,
- StateData#state.server,
- StateData#state.resource);
- _ ->
- From = StateData#state.jid,
- Packet = #xmlel{name = <<"presence">>,
- attrs = [{<<"type">>, <<"unavailable">>}],
- children = []},
- ejabberd_sm:close_session_unset_presence(StateData#state.sid,
- StateData#state.user,
- StateData#state.server,
- StateData#state.resource,
- <<"">>),
- presence_broadcast(StateData, From,
- StateData#state.pres_a, Packet),
- presence_broadcast(StateData, From,
- StateData#state.pres_i, Packet)
- end
- end,
- resend_unacked_stanzas(StateData),
- bounce_messages();
+ case StateData#state.sm_state of
+ resumed ->
+ ?INFO_MSG("Closing former stream of resumed session for ~s",
+ [jlib:jid_to_string(StateData#state.jid)]);
_ ->
- ok
+ if StateName == session_established;
+ StateName == wait_for_resume ->
+ case StateData#state.authenticated of
+ replaced ->
+ ?INFO_MSG("(~w) Replaced session for ~s",
+ [StateData#state.socket,
+ jlib:jid_to_string(StateData#state.jid)]),
+ From = StateData#state.jid,
+ Packet = #xmlel{name = <<"presence">>,
+ attrs = [{<<"type">>, <<"unavailable">>}],
+ children =
+ [#xmlel{name = <<"status">>, attrs = [],
+ children =
+ [{xmlcdata,
+ <<"Replaced by new connection">>}]}]},
+ ejabberd_sm:close_session_unset_presence(StateData#state.sid,
+ StateData#state.user,
+ StateData#state.server,
+ StateData#state.resource,
+ <<"Replaced by new connection">>),
+ presence_broadcast(StateData, From,
+ StateData#state.pres_a, Packet),
+ presence_broadcast(StateData, From,
+ StateData#state.pres_i, Packet),
+ resend_unacked_stanzas(StateData);
+ _ ->
+ ?INFO_MSG("(~w) Close session for ~s",
+ [StateData#state.socket,
+ jlib:jid_to_string(StateData#state.jid)]),
+ EmptySet = (?SETS):new(),
+ case StateData of
+ #state{pres_last = undefined,
+ pres_a = EmptySet,
+ pres_i = EmptySet,
+ pres_invis = false} ->
+ ejabberd_sm:close_session(StateData#state.sid,
+ StateData#state.user,
+ StateData#state.server,
+ StateData#state.resource);
+ _ ->
+ From = StateData#state.jid,
+ Packet = #xmlel{name = <<"presence">>,
+ attrs = [{<<"type">>, <<"unavailable">>}],
+ children = []},
+ ejabberd_sm:close_session_unset_presence(StateData#state.sid,
+ StateData#state.user,
+ StateData#state.server,
+ StateData#state.resource,
+ <<"">>),
+ presence_broadcast(StateData, From,
+ StateData#state.pres_a, Packet),
+ presence_broadcast(StateData, From,
+ StateData#state.pres_i, Packet)
+ end,
+ resend_unacked_stanzas(StateData)
+ end,
+ bounce_messages();
+ true ->
+ ok
+ end
end,
(StateData#state.sockmod):close(StateData#state.socket),
ok.
@@ -1745,6 +1808,8 @@ change_shaper(StateData, JID) ->
(StateData#state.sockmod):change_shaper(StateData#state.socket,
Shaper).
+send_text(StateData, Text) when StateData#state.sm_state == pending ->
+ ?DEBUG("Cannot send text while waiting for resumption: ~p", [Text]);
send_text(StateData, Text) when StateData#state.xml_socket ->
?DEBUG("Send Text on stream = ~p", [Text]),
(StateData#state.sockmod):send_xml(StateData#state.socket,
@@ -1753,13 +1818,17 @@ send_text(StateData, Text) ->
?DEBUG("Send XML on stream = ~p", [Text]),
(StateData#state.sockmod):send(StateData#state.socket, Text).
+send_element(StateData, El) when StateData#state.sm_state == pending ->
+ ?DEBUG("Cannot send element while waiting for resumption: ~p", [El]);
send_element(StateData, El) when StateData#state.xml_socket ->
(StateData#state.sockmod):send_xml(StateData#state.socket,
{xmlstreamelement, El});
send_element(StateData, El) ->
send_text(StateData, xml:element_to_binary(El)).
-send_stanza(StateData, Stanza) when StateData#state.sm_xmlns /= undefined ->
+send_stanza(StateData, Stanza) when StateData#state.sm_state == pending ->
+ ack_queue_add(StateData, Stanza);
+send_stanza(StateData, Stanza) when StateData#state.sm_state == active ->
send_stanza_and_ack_req(StateData, Stanza),
ack_queue_add(StateData, Stanza);
send_stanza(StateData, Stanza) ->
@@ -2356,6 +2425,15 @@ fsm_next_state_gc(StateName, PackedStateData) ->
fsm_next_state(session_established, StateData) ->
{next_state, session_established, StateData,
?C2S_HIBERNATE_TIMEOUT};
+fsm_next_state(wait_for_resume, #state{pending_since = undefined} =
+ StateData) ->
+ {next_state, wait_for_resume,
+ StateData#state{pending_since = os:timestamp()},
+ StateData#state.resume_timeout};
+fsm_next_state(wait_for_resume, StateData) ->
+ Diff = timer:now_diff(os:timestamp(), StateData#state.pending_since),
+ Timeout = max(StateData#state.resume_timeout - Diff div 1000, 1),
+ {next_state, wait_for_resume, StateData, Timeout};
fsm_next_state(StateName, StateData) ->
{next_state, StateName, StateData, ?C2S_OPEN_TIMEOUT}.
@@ -2364,6 +2442,15 @@ fsm_next_state(StateName, StateData) ->
fsm_reply(Reply, session_established, StateData) ->
{reply, Reply, session_established, StateData,
?C2S_HIBERNATE_TIMEOUT};
+fsm_reply(Reply, wait_for_resume, #state{pending_since = undefined} =
+ StateData) ->
+ {reply, Reply, wait_for_resume,
+ StateData#state{pending_since = os:timestamp()},
+ StateData#state.resume_timeout};
+fsm_reply(Reply, wait_for_resume, StateData) ->
+ Diff = timer:now_diff(os:timestamp(), StateData#state.pending_since),
+ Timeout = max(StateData#state.resume_timeout - Diff div 1000, 1),
+ {reply, Reply, wait_for_resume, StateData, Timeout};
fsm_reply(Reply, StateName, StateData) ->
{reply, Reply, StateName, StateData, ?C2S_OPEN_TIMEOUT}.
@@ -2462,7 +2549,7 @@ route_blocking(What, StateData) ->
%%% XEP-0198
%%%----------------------------------------------------------------------
-stream_mgmt_enabled(#state{ack_queue = none}) ->
+stream_mgmt_enabled(#state{sm_state = disabled}) ->
false;
stream_mgmt_enabled(_StateData) ->
true.
@@ -2470,8 +2557,9 @@ stream_mgmt_enabled(_StateData) ->
negotiate_stream_mgmt(_El, #state{resource = <<"">>} = StateData) ->
%% XEP-0198 says: "For client-to-server connections, the client MUST NOT
%% attempt to enable stream management until after it has completed Resource
- %% Binding". However, it also says: "Stream management errors SHOULD be
- %% considered recoverable", so we won't bail out.
+ %% Binding unless it is resuming a previous session". However, it also
+ %% says: "Stream management errors SHOULD be considered recoverable", so we
+ %% won't bail out.
send_element(StateData, ?SM_UNEXPECTED_REQUEST(?NS_STREAM_MGMT_3)),
StateData;
negotiate_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) ->
@@ -2481,10 +2569,11 @@ negotiate_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) ->
true ->
case Name of
<<"enable">> ->
- handle_enable(StateData#state{sm_xmlns = Xmlns});
+ handle_enable(StateData#state{sm_xmlns = Xmlns}, Attrs);
_ ->
Res = if Name == <<"a">>;
- Name == <<"r">> ->
+ Name == <<"r">>;
+ Name == <<"resume">> ->
?SM_UNEXPECTED_REQUEST(Xmlns);
true ->
?SM_BAD_REQUEST(Xmlns)
@@ -2510,7 +2599,8 @@ perform_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) ->
<<"a">> ->
handle_a(StateData, Attrs);
_ ->
- Res = if Name == <<"enable">> ->
+ Res = if Name == <<"enable">>;
+ Name == <<"resume">> ->
?SM_UNEXPECTED_REQUEST(Xmlns);
true ->
?SM_BAD_REQUEST(Xmlns)
@@ -2524,15 +2614,40 @@ perform_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) ->
StateData
end.
-handle_enable(StateData) ->
- ?INFO_MSG("Enabling XEP-0198 stream management for ~s",
- [jlib:jid_to_string(StateData#state.jid)]),
+handle_enable(#state{resume_timeout = ConfigTimeout} = StateData, Attrs) ->
+ Timeout = case xml:get_attr_s(<<"resume">>, Attrs) of
+ ResumeAttr when ResumeAttr == <<"true">>;
+ ResumeAttr == <<"1">> ->
+ MaxAttr = xml:get_attr_s(<<"max">>, Attrs),
+ case catch jlib:binary_to_integer(MaxAttr) of
+ Max when is_integer(Max), Max > 0, Max =< ConfigTimeout ->
+ Max;
+ _ ->
+ ConfigTimeout
+ end;
+ _ ->
+ 0
+ end,
+ ResAttrs = [{<<"xmlns">>, StateData#state.sm_xmlns}] ++
+ if Timeout > 0 ->
+ ?INFO_MSG("Stream management with resumption enabled for ~s",
+ [jlib:jid_to_string(StateData#state.jid)]),
+ [{<<"id">>, make_resume_id(StateData)},
+ {<<"resume">>, <<"true">>},
+ {<<"max">>, jlib:integer_to_binary(Timeout)}];
+ true ->
+ ?INFO_MSG("Stream management without resumption enabled for ~s",
+ [jlib:jid_to_string(StateData#state.jid)]),
+ []
+ end,
Res = #xmlel{name = <<"enabled">>,
- attrs = [{<<"xmlns">>, StateData#state.sm_xmlns}],
+ attrs = ResAttrs,
children = []},
send_element(StateData, Res),
- StateData#state{ack_queue = queue:new(),
- manage_stream = fun perform_stream_mgmt/2}.
+ StateData#state{sm_state = active,
+ ack_queue = queue:new(),
+ manage_stream = fun perform_stream_mgmt/2,
+ resume_timeout = Timeout * 1000}.
handle_r(StateData) ->
H = jlib:integer_to_binary(StateData#state.n_stanzas_in),
@@ -2559,7 +2674,59 @@ handle_a(#state{jid = JID, n_stanzas_out = NumStanzasOut} = StateData, Attrs) ->
StateData
end.
-update_num_stanzas_in(StateData, El) when StateData#state.sm_xmlns /= undefined ->
+handle_resume(StateData, Attrs) ->
+ R = case xml:get_attr_s(<<"xmlns">>, Attrs) of
+ Xmlns when ?IS_SUPPORTED_SM_XMLNS(Xmlns) ->
+ case stream_mgmt_enabled(StateData) of
+ true ->
+ case {xml:get_attr(<<"previd">>, Attrs),
+ catch jlib:binary_to_integer(xml:get_attr_s(<<"h">>, Attrs))}
+ of
+ {{value, PrevID}, H} when is_integer(H) ->
+ case inherit_session_state(StateData, PrevID) of
+ {ok, InheritedState} ->
+ {ok, InheritedState, H};
+ {error, Err} ->
+ {error, ?SM_ITEM_NOT_FOUND(Xmlns), Err}
+ end;
+ _ ->
+ {error, ?SM_BAD_REQUEST(Xmlns), <<"Invalid request">>}
+ end;
+ false ->
+ {error, ?SM_SERVICE_UNAVAILABLE(Xmlns), <<"XEP-0198 disabled">>}
+ end;
+ _ ->
+ {error, ?SM_UNSUPPORTED_VERSION(?NS_STREAM_MGMT_3), <<"Invalid XMLNS">>}
+ end,
+ case R of
+ {ok, ResumedState, NumHandled} ->
+ NewState = ack_queue_drop(ResumedState, NumHandled),
+ AttrXmlns = NewState#state.sm_xmlns,
+ AttrId = make_resume_id(NewState),
+ AttrH = jlib:integer_to_binary(NewState#state.n_stanzas_in),
+ send_element(NewState,
+ #xmlel{name = <<"resumed">>,
+ attrs = [{<<"xmlns">>, AttrXmlns},
+ {<<"h">>, AttrH},
+ {<<"previd">>, AttrId}],
+ children = []}),
+ SendFun = fun(_F, _T, El) -> send_element(NewState, El) end,
+ resend_unacked_stanzas(NewState, SendFun),
+ send_element(NewState,
+ #xmlel{name = <<"r">>,
+ attrs = [{<<"xmlns">>, AttrXmlns}],
+ children = []}),
+ ?INFO_MSG("Resumed session for ~s",
+ [jlib:jid_to_string(NewState#state.jid)]),
+ {ok, NewState};
+ {error, El, Msg} ->
+ send_element(StateData, El),
+ ?WARNING_MSG("Cannot resume session for ~s@~s: ~s",
+ [StateData#state.user, StateData#state.server, Msg]),
+ error
+ end.
+
+update_num_stanzas_in(#state{sm_state = active} = StateData, El) ->
NewNum = case {is_stanza(El), StateData#state.n_stanzas_in} of
{true, 4294967295} ->
0;
@@ -2612,7 +2779,8 @@ limit_queue_length(#state{jid = JID,
StateData
end.
-resend_unacked_stanzas(StateData) when StateData#state.sm_xmlns /= undefined ->
+resend_unacked_stanzas(StateData, F) when StateData#state.sm_state == active;
+ StateData#state.sm_state == pending ->
Queue = StateData#state.ack_queue,
case queue:len(Queue) of
0 ->
@@ -2628,12 +2796,79 @@ resend_unacked_stanzas(StateData) when StateData#state.sm_xmlns /= undefined ->
To = jlib:string_to_jid(To_s),
?DEBUG("Resending unacknowledged stanza #~B from ~s to ~s",
[Num, From_s, To_s]),
- ejabberd_router:route(From, To, El)
+ F(From, To, El)
end, queue:to_list(Queue))
end;
+resend_unacked_stanzas(_StateData, _F) ->
+ ok.
+
+resend_unacked_stanzas(StateData) when StateData#state.sm_state == active;
+ StateData#state.sm_state == pending ->
+ resend_unacked_stanzas(StateData, fun ejabberd_router:route/3);
resend_unacked_stanzas(_StateData) ->
ok.
+inherit_session_state(#state{user = U, server = S} = StateData, ResumeID) ->
+ case jlib:base64_to_term(ResumeID) of
+ {term, {U, S, R, Time}} ->
+ case ejabberd_sm:get_session_pid(U, S, R) of
+ none ->
+ {error, <<"Previous session PID not found">>};
+ OldPID ->
+ OldSID = {Time, OldPID},
+ case catch resume_session(OldPID) of
+ {ok, #state{sid = OldSID} = OldStateData} ->
+ NewSID = {Time, self()}, % Old time, new PID
+ Priority = case OldStateData#state.pres_last of
+ undefined ->
+ 0;
+ Presence ->
+ get_priority_from_presence(Presence)
+ end,
+ Conn = get_conn_type(StateData),
+ Info = [{ip, StateData#state.ip}, {conn, Conn},
+ {auth_module, StateData#state.auth_module}],
+ ejabberd_sm:open_session(NewSID, U, S, R,
+ Priority, Info),
+ {ok, StateData#state{sid = NewSID,
+ jid = OldStateData#state.jid,
+ resource = OldStateData#state.resource,
+ pres_t = OldStateData#state.pres_t,
+ pres_f = OldStateData#state.pres_f,
+ pres_a = OldStateData#state.pres_a,
+ pres_i = OldStateData#state.pres_i,
+ pres_last = OldStateData#state.pres_last,
+ pres_pri = OldStateData#state.pres_pri,
+ pres_timestamp = OldStateData#state.pres_timestamp,
+ pres_invis = OldStateData#state.pres_invis,
+ privacy_list = OldStateData#state.privacy_list,
+ aux_fields = OldStateData#state.aux_fields,
+ sm_xmlns = OldStateData#state.sm_xmlns,
+ ack_queue = OldStateData#state.ack_queue,
+ manage_stream = fun perform_stream_mgmt/2,
+ resume_timeout = OldStateData#state.resume_timeout,
+ n_stanzas_in = OldStateData#state.n_stanzas_in,
+ n_stanzas_out = OldStateData#state.n_stanzas_out,
+ sm_state = active}};
+ _ ->
+ {error, <<"Cannot grab session state">>}
+ end
+ end;
+ error ->
+ {error, <<"Invalid 'previd' value">>}
+ end.
+
+resume_session(FsmRef) ->
+ (?GEN_FSM):sync_send_all_state_event(FsmRef, resume_session, 3000).
+
+make_resume_id(StateData) ->
+ {Time, _} = StateData#state.sid,
+ ID = {StateData#state.user,
+ StateData#state.server,
+ StateData#state.resource,
+ Time},
+ jlib:term_to_base64(ID).
+
%%%----------------------------------------------------------------------
%%% JID Set memory footprint reduction code
%%%----------------------------------------------------------------------
diff --git a/src/ejabberd_sm.erl b/src/ejabberd_sm.erl
index c7e277a8d..6bcacbe78 100644
--- a/src/ejabberd_sm.erl
+++ b/src/ejabberd_sm.erl
@@ -33,7 +33,9 @@
%% API
-export([start_link/0,
route/3,
- open_session/5, close_session/4,
+ open_session/5,
+ open_session/6,
+ close_session/4,
check_in_subscription/6,
bounce_offline_message/3,
disconnect_removed_user/2,
@@ -107,10 +109,10 @@ route(From, To, Packet) ->
_ -> ok
end.
--spec open_session(sid(), binary(), binary(), binary(), info()) -> ok.
+-spec open_session(sid(), binary(), binary(), binary(), prio(), info()) -> ok.
-open_session(SID, User, Server, Resource, Info) ->
- set_session(SID, User, Server, Resource, undefined, Info),
+open_session(SID, User, Server, Resource, Priority, Info) ->
+ set_session(SID, User, Server, Resource, Priority, Info),
mnesia:dirty_update_counter(session_counter,
jlib:nameprep(Server), 1),
check_for_sessions_to_replace(User, Server, Resource),
@@ -118,6 +120,11 @@ open_session(SID, User, Server, Resource, Info) ->
ejabberd_hooks:run(sm_register_connection_hook,
JID#jid.lserver, [SID, JID, Info]).
+-spec open_session(sid(), binary(), binary(), binary(), info()) -> ok.
+
+open_session(SID, User, Server, Resource, Info) ->
+ open_session(SID, User, Server, Resource, undefined, Info).
+
-spec close_session(sid(), binary(), binary(), binary()) -> ok.
close_session(SID, User, Server, Resource) ->
diff --git a/src/jlib.erl b/src/jlib.erl
index a362697f4..ffabb3ffe 100644
--- a/src/jlib.erl
+++ b/src/jlib.erl
@@ -46,6 +46,7 @@
timestamp_to_iso/2, timestamp_to_xml/4,
timestamp_to_xml/1, now_to_utc_string/1,
now_to_local_string/1, datetime_string_to_timestamp/1,
+ term_to_base64/1, base64_to_term/1,
decode_base64/1, encode_base64/1, ip_to_list/1,
rsm_encode/1, rsm_encode/2, rsm_decode/1,
binary_to_integer/1, binary_to_integer/2,
@@ -780,6 +781,21 @@ check_list(List) ->
% Base64 stuff (based on httpd_util.erl)
%
+-spec term_to_base64(term()) -> binary().
+
+term_to_base64(Term) ->
+ encode_base64(term_to_binary(Term)).
+
+-spec base64_to_term(binary()) -> {term, term()} | error.
+
+base64_to_term(Base64) ->
+ case catch binary_to_term(decode_base64(Base64), [safe]) of
+ {'EXIT', _} ->
+ error;
+ Term ->
+ {term, Term}
+ end.
+
-spec decode_base64(binary()) -> binary().
decode_base64(S) ->