aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/ejabberd_c2s.erl8
-rw-r--r--src/mod_stream_mgmt.erl86
2 files changed, 54 insertions, 40 deletions
diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl
index c88221e27..92517c92a 100644
--- a/src/ejabberd_c2s.erl
+++ b/src/ejabberd_c2s.erl
@@ -180,10 +180,9 @@ host_down(Host) ->
%% Copies content of one c2s state to another.
%% This is needed for session migration from one pid to another.
-spec copy_state(state(), state()) -> state().
-copy_state(#{owner := Owner} = NewState,
- #{jid := JID, resource := Resource, sid := {Time, _},
- auth_module := AuthModule, lserver := LServer,
- pres_a := PresA} = OldState) ->
+copy_state(NewState,
+ #{jid := JID, resource := Resource, auth_module := AuthModule,
+ lserver := LServer, pres_a := PresA} = OldState) ->
State1 = case OldState of
#{pres_last := Pres, pres_timestamp := PresTS} ->
NewState#{pres_last => Pres, pres_timestamp => PresTS};
@@ -193,7 +192,6 @@ copy_state(#{owner := Owner} = NewState,
Conn = get_conn_type(State1),
State2 = State1#{jid => JID, resource => Resource,
conn => Conn,
- sid => {Time, Owner},
auth_module => AuthModule,
pres_a => PresA},
ejabberd_hooks:run_fold(c2s_copy_session, LServer, State2, [OldState]).
diff --git a/src/mod_stream_mgmt.erl b/src/mod_stream_mgmt.erl
index 45d52ccb0..ee72e152f 100644
--- a/src/mod_stream_mgmt.erl
+++ b/src/mod_stream_mgmt.erl
@@ -52,6 +52,7 @@
-type state() :: ejabberd_c2s:state().
-type queue() :: p1_queue:queue({non_neg_integer(), erlang:timestamp(), xmpp_element() | xmlel()}).
+-type id() :: binary().
-type error_reason() :: session_not_found | session_timed_out |
session_is_dead | session_has_exited |
session_was_killed | session_copy_timed_out |
@@ -228,8 +229,8 @@ c2s_handle_send(#{mgmt_state := MgmtState, mod := Mod,
c2s_handle_send(State, _Pkt, _Result) ->
State.
-c2s_handle_call(#{sid := {Time, _}, mod := Mod, mgmt_queue := Queue} = State,
- {resume_session, Time}, From) ->
+c2s_handle_call(#{mgmt_id := MgmtID, mgmt_queue := Queue, mod := Mod} = State,
+ {resume_session, MgmtID}, From) ->
State1 = State#{mgmt_queue => p1_queue:file_to_ram(Queue)},
Mod:reply(From, {resume, State1}),
{stop, State#{mgmt_state => resumed, mgmt_queue => p1_queue:clear(Queue)}};
@@ -288,10 +289,10 @@ c2s_terminated(#{mgmt_state := resumed, sid := SID, jid := JID} = State, _Reason
ejabberd_c2s:bounce_message_queue(SID, JID),
{stop, State};
c2s_terminated(#{mgmt_state := MgmtState, mgmt_stanzas_in := In,
- sid := {Time, _}, jid := JID} = State, _Reason) ->
+ mgmt_id := MgmtID, jid := JID} = State, _Reason) ->
case MgmtState of
timeout ->
- store_stanzas_in(jid:tolower(JID), Time, In);
+ store_stanzas_in(jid:tolower(JID), MgmtID, In);
_ ->
ok
end,
@@ -377,6 +378,7 @@ handle_enable(#{mgmt_timeout := DefaultTimeout,
mgmt_max_timeout := MaxTimeout,
mgmt_xmlns := Xmlns, jid := JID} = State,
#sm_enable{resume = Resume, max = Max}) ->
+ State1 = State#{mgmt_id => make_id()},
Timeout = if Resume == false ->
0;
Max /= undefined, Max > 0, Max*1000 =< MaxTimeout ->
@@ -388,7 +390,7 @@ handle_enable(#{mgmt_timeout := DefaultTimeout,
?DEBUG("Stream management with resumption enabled for ~ts",
[jid:encode(JID)]),
#sm_enabled{xmlns = Xmlns,
- id = make_resume_id(State),
+ id = encode_id(State1),
resume = true,
max = Timeout div 1000};
true ->
@@ -396,10 +398,10 @@ handle_enable(#{mgmt_timeout := DefaultTimeout,
[jid:encode(JID)]),
#sm_enabled{xmlns = Xmlns}
end,
- State1 = State#{mgmt_state => active,
- mgmt_queue => p1_queue:new(QueueType),
- mgmt_timeout => Timeout},
- send(State1, Res).
+ State2 = State1#{mgmt_state => active,
+ mgmt_queue => p1_queue:new(QueueType),
+ mgmt_timeout => Timeout},
+ send(State2, Res).
-spec handle_r(state()) -> state().
handle_r(#{mgmt_xmlns := Xmlns, mgmt_stanzas_in := H} = State) ->
@@ -431,10 +433,9 @@ handle_resume(#{user := User, lserver := LServer,
{ok, #{jid := JID} = ResumedState, NumHandled} ->
State1 = check_h_attribute(ResumedState, NumHandled),
#{mgmt_xmlns := AttrXmlns, mgmt_stanzas_in := AttrH} = State1,
- AttrId = make_resume_id(State1),
State2 = send(State1, #sm_resumed{xmlns = AttrXmlns,
h = AttrH,
- previd = AttrId}),
+ previd = PrevID}),
State3 = resend_unacked_stanzas(State2),
State4 = send(State3, #sm_r{xmlns = AttrXmlns}),
State5 = ejabberd_hooks:run_fold(c2s_session_resumed, LServer, State4, []),
@@ -649,20 +650,19 @@ route_unacked_stanzas(_State) ->
{error, error_reason()} |
{error, error_reason(), non_neg_integer()}.
inherit_session_state(#{user := U, server := S,
- mgmt_queue_type := QueueType} = State, ResumeID) ->
- case misc:base64_to_term(ResumeID) of
- {term, {R, Time}} ->
- case ejabberd_sm:get_session_pid(U, S, R) of
+ mgmt_queue_type := QueueType} = State, PrevID) ->
+ case decode_id(PrevID) of
+ {ok, {R, MgmtID}} ->
+ case ejabberd_sm:get_session_sid(U, S, R) of
none ->
- case pop_stanzas_in({U, S, R}, Time) of
+ case pop_stanzas_in({U, S, R}, MgmtID) of
error ->
{error, session_not_found};
{ok, H} ->
{error, session_timed_out, H}
end;
- OldPID ->
- OldSID = {Time, OldPID},
- try resume_session(OldSID, State) of
+ {_, OldPID} = OldSID ->
+ try resume_session(OldPID, MgmtID, State) of
{resume, #{mgmt_xmlns := Xmlns,
mgmt_queue := Queue,
mgmt_timeout := Timeout,
@@ -673,7 +673,9 @@ inherit_session_state(#{user := U, server := S,
ram -> Queue;
_ -> p1_queue:ram_to_file(Queue)
end,
- State2 = State1#{mgmt_xmlns => Xmlns,
+ State2 = State1#{sid => ejabberd_sm:make_sid(),
+ mgmt_id => MgmtID,
+ mgmt_xmlns => Xmlns,
mgmt_queue => Queue1,
mgmt_timeout => Timeout,
mgmt_stanzas_in => NumStanzasIn,
@@ -698,18 +700,14 @@ inherit_session_state(#{user := U, server := S,
{error, session_copy_timed_out}
end
end;
- _ ->
+ error ->
{error, invalid_previd}
end.
--spec resume_session({erlang:timestamp(), pid()}, state()) -> {resume, state()} |
- {error, error_reason()}.
-resume_session({Time, Pid}, _State) ->
- ejabberd_c2s:call(Pid, {resume_session, Time}, timer:seconds(15)).
-
--spec make_resume_id(state()) -> binary().
-make_resume_id(#{sid := {Time, _}, resource := Resource}) ->
- misc:term_to_base64({Resource, Time}).
+-spec resume_session(pid(), id(), state()) -> {resume, state()} |
+ {error, error_reason()}.
+resume_session(PID, MgmtID, _State) ->
+ ejabberd_c2s:call(PID, {resume_session, MgmtID}, timer:seconds(15)).
-spec add_resent_delay_info(state(), stanza(), erlang:timestamp()) -> stanza();
(state(), xmlel(), erlang:timestamp()) -> xmlel().
@@ -756,6 +754,24 @@ need_to_enqueue(#{mgmt_force_enqueue := true} = State, #xmlel{}) ->
need_to_enqueue(State, _) ->
{false, State}.
+-spec make_id() -> id().
+make_id() ->
+ p1_rand:bytes(8).
+
+-spec encode_id(state()) -> binary().
+encode_id(#{mgmt_id := MgmtID, resource := Resource}) ->
+ misc:term_to_base64({Resource, MgmtID}).
+
+-spec decode_id(binary()) -> {ok, {binary(), id()}} | error.
+decode_id(Encoded) ->
+ case misc:base64_to_term(Encoded) of
+ {term, {Resource, MgmtID}} when is_binary(Resource),
+ is_binary(MgmtID) ->
+ {ok, {Resource, MgmtID}};
+ _ ->
+ error
+ end.
+
%%%===================================================================
%%% Formatters and Logging
%%%===================================================================
@@ -803,14 +819,14 @@ cache_opts(Opts) ->
{life_time, mod_stream_mgmt_opt:cache_life_time(Opts)},
{type, ordered_set}].
--spec store_stanzas_in(ljid(), erlang:timestamp(), non_neg_integer()) -> boolean().
-store_stanzas_in(LJID, Time, Num) ->
- ets_cache:insert(?STREAM_MGMT_CACHE, {LJID, Time}, Num,
+-spec store_stanzas_in(ljid(), id(), non_neg_integer()) -> boolean().
+store_stanzas_in(LJID, MgmtID, Num) ->
+ ets_cache:insert(?STREAM_MGMT_CACHE, {LJID, MgmtID}, Num,
ejabberd_cluster:get_nodes()).
--spec pop_stanzas_in(ljid(), erlang:timestamp()) -> {ok, non_neg_integer()} | error.
-pop_stanzas_in(LJID, Time) ->
- case ets_cache:lookup(?STREAM_MGMT_CACHE, {LJID, Time}) of
+-spec pop_stanzas_in(ljid(), id()) -> {ok, non_neg_integer()} | error.
+pop_stanzas_in(LJID, MgmtID) ->
+ case ets_cache:lookup(?STREAM_MGMT_CACHE, {LJID, MgmtID}) of
{ok, Val} ->
ets_cache:match_delete(?STREAM_MGMT_CACHE, {LJID, '_'},
ejabberd_cluster:get_nodes()),