aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/ejabberd_c2s.erl837
-rw-r--r--src/ejabberd_sm.erl15
-rw-r--r--src/jlib.erl33
3 files changed, 754 insertions, 131 deletions
diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl
index f2e16e15b..8874c48ad 100644
--- a/src/ejabberd_c2s.erl
+++ b/src/ejabberd_c2s.erl
@@ -57,6 +57,7 @@
wait_for_bind/2,
wait_for_session/2,
wait_for_sasl_response/2,
+ wait_for_resume/2,
session_established/2,
handle_event/3,
handle_sync_event/4,
@@ -107,6 +108,15 @@
auth_module = unknown,
ip,
aux_fields = [],
+ mgmt_state,
+ mgmt_xmlns,
+ mgmt_queue,
+ mgmt_max_queue,
+ mgmt_pending_since,
+ mgmt_timeout,
+ mgmt_resend,
+ mgmt_stanzas_in = 0,
+ mgmt_stanzas_out = 0,
lang = <<"">>}).
%-define(DBGFSM, true).
@@ -155,6 +165,39 @@
-define(INVALID_FROM, ?SERR_INVALID_FROM).
+%% XEP-0198:
+
+-define(IS_STREAM_MGMT_TAG(Name),
+ Name == <<"enable">>;
+ Name == <<"resume">>;
+ Name == <<"a">>;
+ Name == <<"r">>).
+
+-define(IS_SUPPORTED_MGMT_XMLNS(Xmlns),
+ Xmlns == ?NS_STREAM_MGMT_2;
+ Xmlns == ?NS_STREAM_MGMT_3).
+
+-define(MGMT_FAILED(Condition, Xmlns),
+ #xmlel{name = <<"failed">>,
+ attrs = [{<<"xmlns">>, Xmlns}],
+ children = [#xmlel{name = Condition,
+ attrs = [{<<"xmlns">>, ?NS_STANZAS}],
+ children = []}]}).
+
+-define(MGMT_BAD_REQUEST(Xmlns),
+ ?MGMT_FAILED(<<"bad-request">>, Xmlns)).
+
+-define(MGMT_ITEM_NOT_FOUND(Xmlns),
+ ?MGMT_FAILED(<<"item-not-found">>, Xmlns)).
+
+-define(MGMT_SERVICE_UNAVAILABLE(Xmlns),
+ ?MGMT_FAILED(<<"service-unavailable">>, Xmlns)).
+
+-define(MGMT_UNEXPECTED_REQUEST(Xmlns),
+ ?MGMT_FAILED(<<"unexpected-request">>, Xmlns)).
+
+-define(MGMT_UNSUPPORTED_VERSION(Xmlns),
+ ?MGMT_FAILED(<<"unsupported-version">>, Xmlns)).
%%%----------------------------------------------------------------------
%%% API
@@ -258,6 +301,19 @@ init([{SockMod, Socket}, Opts]) ->
true -> TLSOpts2
end,
TLSOpts = [verify_none | TLSOpts3],
+ StreamMgmtEnabled = proplists:get_value(stream_management, Opts, true),
+ StreamMgmtState = if StreamMgmtEnabled -> inactive;
+ true -> disabled
+ end,
+ MaxAckQueue = case proplists:get_value(max_ack_queue, Opts) of
+ Limit when is_integer(Limit), Limit > 0 -> Limit;
+ _ -> 500
+ end,
+ ResumeTimeout = case proplists:get_value(resume_timeout, Opts) of
+ Timeout when is_integer(Timeout), Timeout >= 0 -> Timeout;
+ _ -> 300
+ end,
+ ResendOnTimeout = proplists:get_bool(resend_on_timeout, Opts),
IP = peerip(SockMod, Socket),
%% Check if IP is blacklisted:
case is_ip_blacklisted(IP) of
@@ -278,8 +334,12 @@ init([{SockMod, Socket}, Opts]) ->
xml_socket = XMLSocket, zlib = Zlib, tls = TLS,
tls_required = StartTLSRequired,
tls_enabled = TLSEnabled, tls_options = TLSOpts,
- streamid = new_id(), access = Access,
- shaper = Shaper, ip = IP},
+ sid = {now(), self()}, streamid = new_id(),
+ access = Access, shaper = Shaper, ip = IP,
+ mgmt_state = StreamMgmtState,
+ mgmt_max_queue = MaxAckQueue,
+ mgmt_timeout = ResumeTimeout,
+ mgmt_resend = ResendOnTimeout},
{ok, wait_for_stream, StateData, ?C2S_OPEN_TIMEOUT}
end.
@@ -410,6 +470,18 @@ wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData) ->
ejabberd_hooks:run_fold(roster_get_versioning_feature,
Server, [],
[Server]),
+ StreamManagementFeature =
+ case stream_mgmt_enabled(StateData) of
+ true ->
+ [#xmlel{name = <<"sm">>,
+ attrs = [{<<"xmlns">>, ?NS_STREAM_MGMT_2}],
+ children = []},
+ #xmlel{name = <<"sm">>,
+ attrs = [{<<"xmlns">>, ?NS_STREAM_MGMT_3}],
+ children = []}];
+ false ->
+ []
+ end,
StreamFeatures = [#xmlel{name = <<"bind">>,
attrs = [{<<"xmlns">>, ?NS_BIND}],
children = []},
@@ -418,6 +490,7 @@ wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData) ->
children = []}]
++
RosterVersioningFeature ++
+ StreamManagementFeature ++
ejabberd_hooks:run_fold(c2s_stream_features,
Server, [], [Server]),
send_element(StateData,
@@ -480,6 +553,9 @@ wait_for_stream({xmlstreamerror, _}, StateData) ->
wait_for_stream(closed, StateData) ->
{stop, normal, StateData}.
+wait_for_auth({xmlstreamelement, #xmlel{name = Name} = El}, StateData)
+ when ?IS_STREAM_MGMT_TAG(Name) ->
+ fsm_next_state(wait_for_auth, dispatch_stream_mgmt(El, StateData));
wait_for_auth({xmlstreamelement, El}, StateData) ->
case is_auth_packet(El) of
{auth, _ID, get, {U, _, _, _}} ->
@@ -549,14 +625,15 @@ wait_for_auth({xmlstreamelement, El}, StateData) ->
?INFO_MSG("(~w) Accepted legacy authentication for ~s by ~p",
[StateData#state.socket,
jlib:jid_to_string(JID), AuthModule]),
- SID = {now(), self()},
Conn = get_conn_type(StateData),
Info = [{ip, StateData#state.ip}, {conn, Conn},
{auth_module, AuthModule}],
Res = jlib:make_result_iq_reply(
El#xmlel{children = []}),
send_element(StateData, Res),
- ejabberd_sm:open_session(SID, U, StateData#state.server, R, Info),
+ ejabberd_sm:open_session(StateData#state.sid, U,
+ StateData#state.server, R,
+ Info),
change_shaper(StateData, JID),
{Fs, Ts} =
ejabberd_hooks:run_fold(roster_get_subscription_lists,
@@ -574,7 +651,7 @@ wait_for_auth({xmlstreamelement, El}, StateData) ->
[U, StateData#state.server]),
NewStateData = StateData#state{user = U,
resource = R,
- jid = JID, sid = SID,
+ jid = JID,
conn = Conn,
auth_module = AuthModule,
pres_f = (?SETS):from_list(Fs1),
@@ -625,6 +702,11 @@ wait_for_auth({xmlstreamerror, _}, StateData) ->
wait_for_auth(closed, StateData) ->
{stop, normal, StateData}.
+wait_for_feature_request({xmlstreamelement, #xmlel{name = Name} = El},
+ StateData)
+ when ?IS_STREAM_MGMT_TAG(Name) ->
+ fsm_next_state(wait_for_feature_request,
+ dispatch_stream_mgmt(El, StateData));
wait_for_feature_request({xmlstreamelement, El},
StateData) ->
#xmlel{name = Name, attrs = Attrs, children = Els} = El,
@@ -790,6 +872,9 @@ wait_for_feature_request({xmlstreamerror, _},
wait_for_feature_request(closed, StateData) ->
{stop, normal, StateData}.
+wait_for_sasl_response({xmlstreamelement, #xmlel{name = Name} = El}, StateData)
+ when ?IS_STREAM_MGMT_TAG(Name) ->
+ fsm_next_state(wait_for_sasl_response, dispatch_stream_mgmt(El, StateData));
wait_for_sasl_response({xmlstreamelement, El},
StateData) ->
#xmlel{name = Name, attrs = Attrs, children = Els} = El,
@@ -920,6 +1005,20 @@ resource_conflict_action(U, S, R) ->
{accept_resource, Rnew}
end.
+wait_for_bind({xmlstreamelement, #xmlel{name = Name, attrs = Attrs} = El},
+ StateData)
+ when ?IS_STREAM_MGMT_TAG(Name) ->
+ case Name of
+ <<"resume">> ->
+ case handle_resume(StateData, Attrs) of
+ {ok, ResumedState} ->
+ fsm_next_state(session_established, ResumedState);
+ error ->
+ fsm_next_state(wait_for_bind, StateData)
+ end;
+ _ ->
+ fsm_next_state(wait_for_bind, dispatch_stream_mgmt(El, StateData))
+ end;
wait_for_bind({xmlstreamelement, El}, StateData) ->
case jlib:iq_query_info(El) of
#iq{type = set, xmlns = ?NS_BIND, sub_el = SubEl} =
@@ -981,61 +1080,63 @@ wait_for_bind({xmlstreamerror, _}, StateData) ->
wait_for_bind(closed, StateData) ->
{stop, normal, StateData}.
+wait_for_session({xmlstreamelement, #xmlel{name = Name} = El}, StateData)
+ when ?IS_STREAM_MGMT_TAG(Name) ->
+ fsm_next_state(wait_for_session, dispatch_stream_mgmt(El, StateData));
wait_for_session({xmlstreamelement, El}, StateData) ->
+ NewStateData = update_num_stanzas_in(StateData, El),
case jlib:iq_query_info(El) of
#iq{type = set, xmlns = ?NS_SESSION} ->
- U = StateData#state.user,
- R = StateData#state.resource,
- JID = StateData#state.jid,
- case acl:match_rule(StateData#state.server,
- StateData#state.access, JID) of
+ U = NewStateData#state.user,
+ R = NewStateData#state.resource,
+ JID = NewStateData#state.jid,
+ case acl:match_rule(NewStateData#state.server,
+ NewStateData#state.access, JID) of
allow ->
?INFO_MSG("(~w) Opened session for ~s",
- [StateData#state.socket,
+ [NewStateData#state.socket,
jlib:jid_to_string(JID)]),
Res = jlib:make_result_iq_reply(El#xmlel{children = []}),
- send_element(StateData, Res),
- change_shaper(StateData, JID),
+ NewState = send_stanza(NewStateData, Res),
+ change_shaper(NewState, JID),
{Fs, Ts} = ejabberd_hooks:run_fold(
roster_get_subscription_lists,
- StateData#state.server,
+ NewState#state.server,
{[], []},
- [U, StateData#state.server]),
+ [U, NewState#state.server]),
LJID = jlib:jid_tolower(jlib:jid_remove_resource(JID)),
Fs1 = [LJID | Fs],
Ts1 = [LJID | Ts],
PrivList =
ejabberd_hooks:run_fold(
- privacy_get_user_list, StateData#state.server,
+ privacy_get_user_list, NewState#state.server,
#userlist{},
- [U, StateData#state.server]),
- SID = {now(), self()},
- Conn = get_conn_type(StateData),
- Info = [{ip, StateData#state.ip}, {conn, Conn},
- {auth_module, StateData#state.auth_module}],
+ [U, NewState#state.server]),
+ Conn = get_conn_type(NewState),
+ Info = [{ip, NewState#state.ip}, {conn, Conn},
+ {auth_module, NewState#state.auth_module}],
ejabberd_sm:open_session(
- SID, U, StateData#state.server, R, Info),
- NewStateData =
- StateData#state{
- sid = SID,
+ NewState#state.sid, U, NewState#state.server, R, Info),
+ UpdatedStateData =
+ NewState#state{
conn = Conn,
pres_f = ?SETS:from_list(Fs1),
pres_t = ?SETS:from_list(Ts1),
privacy_list = PrivList},
fsm_next_state_pack(session_established,
- NewStateData);
+ UpdatedStateData);
_ ->
ejabberd_hooks:run(forbidden_session_hook,
- StateData#state.server, [JID]),
+ NewStateData#state.server, [JID]),
?INFO_MSG("(~w) Forbidden session for ~s",
- [StateData#state.socket,
+ [NewStateData#state.socket,
jlib:jid_to_string(JID)]),
Err = jlib:make_error_reply(El, ?ERR_NOT_ALLOWED),
- send_element(StateData, Err),
- fsm_next_state(wait_for_session, StateData)
+ send_element(NewStateData, Err),
+ fsm_next_state(wait_for_session, NewStateData)
end;
_ ->
- fsm_next_state(wait_for_session, StateData)
+ fsm_next_state(wait_for_session, NewStateData)
end;
wait_for_session(timeout, StateData) ->
@@ -1049,6 +1150,9 @@ wait_for_session({xmlstreamerror, _}, StateData) ->
wait_for_session(closed, StateData) ->
{stop, normal, StateData}.
+session_established({xmlstreamelement, #xmlel{name = Name} = El}, StateData)
+ when ?IS_STREAM_MGMT_TAG(Name) ->
+ fsm_next_state(session_established, dispatch_stream_mgmt(El, StateData));
session_established({xmlstreamelement, El},
StateData) ->
FromJID = StateData#state.jid,
@@ -1080,6 +1184,12 @@ session_established({xmlstreamerror, _}, StateData) ->
send_element(StateData, ?INVALID_XML_ERR),
send_trailer(StateData),
{stop, normal, StateData};
+session_established(closed, StateData)
+ when StateData#state.mgmt_timeout > 0,
+ StateData#state.mgmt_state == active orelse
+ StateData#state.mgmt_state == pending ->
+ log_pending_state(StateData),
+ fsm_next_state(wait_for_resume, StateData#state{mgmt_state = pending});
session_established(closed, StateData) ->
{stop, normal, StateData}.
@@ -1087,9 +1197,10 @@ session_established(closed, StateData) ->
%% connection)
session_established2(El, StateData) ->
#xmlel{name = Name, attrs = Attrs} = El,
- User = StateData#state.user,
- Server = StateData#state.server,
- FromJID = StateData#state.jid,
+ NewStateData = update_num_stanzas_in(StateData, El),
+ User = NewStateData#state.user,
+ Server = NewStateData#state.server,
+ FromJID = NewStateData#state.jid,
To = xml:get_attr_s(<<"to">>, Attrs),
ToJID = case To of
<<"">> -> jlib:make_jid(User, Server, <<"">>);
@@ -1098,7 +1209,7 @@ session_established2(El, StateData) ->
NewEl1 = jlib:remove_attr(<<"xmlns">>, El),
NewEl = case xml:get_attr_s(<<"xml:lang">>, Attrs) of
<<"">> ->
- case StateData#state.lang of
+ case NewStateData#state.lang of
<<"">> -> NewEl1;
Lang ->
xml:replace_tag_attr(<<"xml:lang">>, Lang, NewEl1)
@@ -1108,13 +1219,18 @@ session_established2(El, StateData) ->
NewState = case ToJID of
error ->
case xml:get_attr_s(<<"type">>, Attrs) of
- <<"error">> -> StateData;
- <<"result">> -> StateData;
+ <<"error">> -> NewStateData;
+ <<"result">> -> NewStateData;
_ ->
Err = jlib:make_error_reply(NewEl,
?ERR_JID_MALFORMED),
- send_element(StateData, Err),
- StateData
+ case is_stanza(Err) of
+ true ->
+ send_stanza(NewStateData, Err);
+ false ->
+ send_element(NewStateData, Err),
+ NewStateData
+ end
end;
_ ->
case Name of
@@ -1129,12 +1245,12 @@ session_established2(El, StateData) ->
#jid{user = User, server = Server,
resource = <<"">>} ->
?DEBUG("presence_update(~p,~n\t~p,~n\t~p)",
- [FromJID, PresenceEl, StateData]),
+ [FromJID, PresenceEl, NewStateData]),
presence_update(FromJID, PresenceEl,
- StateData);
+ NewStateData);
_ ->
presence_track(FromJID, ToJID, PresenceEl,
- StateData)
+ NewStateData)
end;
<<"iq">> ->
case jlib:iq_query_info(NewEl) of
@@ -1142,27 +1258,38 @@ session_established2(El, StateData) ->
when Xmlns == (?NS_PRIVACY);
Xmlns == (?NS_BLOCKING) ->
process_privacy_iq(FromJID, ToJID, IQ,
- StateData);
+ NewStateData);
_ ->
ejabberd_hooks:run(user_send_packet, Server,
[FromJID, ToJID, NewEl]),
- check_privacy_route(FromJID, StateData,
+ check_privacy_route(FromJID, NewStateData,
FromJID, ToJID, NewEl),
- StateData
+ NewStateData
end;
<<"message">> ->
ejabberd_hooks:run(user_send_packet, Server,
[FromJID, ToJID, NewEl]),
- check_privacy_route(FromJID, StateData, FromJID,
+ check_privacy_route(FromJID, NewStateData, FromJID,
ToJID, NewEl),
- StateData;
- _ -> StateData
+ NewStateData;
+ _ -> NewStateData
end
end,
ejabberd_hooks:run(c2s_loop_debug,
[{xmlstreamelement, El}]),
fsm_next_state(session_established, NewState).
+wait_for_resume({xmlstreamelement, _El} = Event, StateData) ->
+ session_established(Event, StateData),
+ fsm_next_state(wait_for_resume, StateData);
+wait_for_resume(timeout, StateData) ->
+ ?DEBUG("Timed out waiting for resumption of stream for ~s",
+ [jlib:jid_to_string(StateData#state.jid)]),
+ {stop, normal, StateData};
+wait_for_resume(Event, StateData) ->
+ ?DEBUG("Ignoring event while waiting for resumption: ~p", [Event]),
+ fsm_next_state(wait_for_resume, StateData).
+
%%----------------------------------------------------------------------
%% Func: StateName/3
%% Returns: {next_state, NextStateName, NextStateData} |
@@ -1207,6 +1334,15 @@ handle_sync_event(get_subscribed, _From, StateName,
StateData) ->
Subscribed = (?SETS):to_list(StateData#state.pres_f),
{reply, Subscribed, StateName, StateData};
+handle_sync_event(resume_session, _From, _StateName,
+ StateData) ->
+ %% The old session should be closed before the new one is opened, so we do
+ %% this here instead of leaving it to the terminate callback
+ ejabberd_sm:close_session(StateData#state.sid,
+ StateData#state.user,
+ StateData#state.server,
+ StateData#state.resource),
+ {stop, normal, {ok, StateData}, StateData#state{mgmt_state = resumed}};
handle_sync_event(_Event, _From, StateName,
StateData) ->
Reply = ok, fsm_reply(Reply, StateName, StateData).
@@ -1275,13 +1411,13 @@ handle_info({route, _From, _To, {broadcast, Data}},
jlib:jid_remove_resource(StateData#state.jid),
StateData#state.jid,
jlib:iq_to_xml(PrivPushIQ)),
- send_element(StateData, PrivPushEl),
+ NewState = send_stanza(StateData, PrivPushEl),
fsm_next_state(StateName,
- StateData#state{privacy_list = NewPL})
+ NewState#state{privacy_list = NewPL})
end;
{blocking, What} ->
- route_blocking(What, StateData),
- fsm_next_state(StateName, StateData);
+ NewState = route_blocking(What, StateData),
+ fsm_next_state(StateName, NewState);
_ ->
fsm_next_state(StateName, StateData)
end;
@@ -1527,12 +1663,12 @@ handle_info({route, From, To,
jlib:replace_from_to_attrs(jlib:jid_to_string(From),
jlib:jid_to_string(To), NewAttrs),
FixedPacket = #xmlel{name = Name, attrs = Attrs2, children = Els},
- send_element(StateData, FixedPacket),
+ SentStateData = send_stanza(StateData, FixedPacket),
ejabberd_hooks:run(user_receive_packet,
- StateData#state.server,
- [StateData#state.jid, From, To, FixedPacket]),
+ SentStateData#state.server,
+ [SentStateData#state.jid, From, To, FixedPacket]),
ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]),
- fsm_next_state(StateName, NewState);
+ fsm_next_state(StateName, SentStateData);
true ->
ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]),
fsm_next_state(StateName, NewState)
@@ -1540,7 +1676,15 @@ handle_info({route, From, To,
handle_info({'DOWN', Monitor, _Type, _Object, _Info},
_StateName, StateData)
when Monitor == StateData#state.socket_monitor ->
- {stop, normal, StateData};
+ if StateData#state.mgmt_timeout > 0,
+ StateData#state.mgmt_state == active orelse
+ StateData#state.mgmt_state == pending ->
+ log_pending_state(StateData),
+ fsm_next_state(wait_for_resume,
+ StateData#state{mgmt_state = pending});
+ true ->
+ {stop, normal, StateData}
+ end;
handle_info(system_shutdown, StateName, StateData) ->
case StateName of
wait_for_stream ->
@@ -1604,60 +1748,71 @@ print_state(State = #state{pres_t = T, pres_f = F, pres_a = A, pres_i = I}) ->
%% Returns: any
%%----------------------------------------------------------------------
terminate(_Reason, StateName, StateData) ->
- case StateName of
- session_established ->
- case StateData#state.authenticated of
- replaced ->
- ?INFO_MSG("(~w) Replaced session for ~s",
- [StateData#state.socket,
- jlib:jid_to_string(StateData#state.jid)]),
- From = StateData#state.jid,
- Packet = #xmlel{name = <<"presence">>,
- attrs = [{<<"type">>, <<"unavailable">>}],
- children =
- [#xmlel{name = <<"status">>, attrs = [],
- children =
- [{xmlcdata,
- <<"Replaced by new connection">>}]}]},
- ejabberd_sm:close_session_unset_presence(StateData#state.sid,
- StateData#state.user,
- StateData#state.server,
- StateData#state.resource,
- <<"Replaced by new connection">>),
- presence_broadcast(StateData, From,
- StateData#state.pres_a, Packet),
- presence_broadcast(StateData, From,
- StateData#state.pres_i, Packet);
- _ ->
- ?INFO_MSG("(~w) Close session for ~s",
- [StateData#state.socket,
- jlib:jid_to_string(StateData#state.jid)]),
- EmptySet = (?SETS):new(),
- case StateData of
- #state{pres_last = undefined, pres_a = EmptySet, pres_i = EmptySet, pres_invis = false} ->
- ejabberd_sm:close_session(StateData#state.sid,
- StateData#state.user,
- StateData#state.server,
- StateData#state.resource);
- _ ->
- From = StateData#state.jid,
- Packet = #xmlel{name = <<"presence">>,
- attrs = [{<<"type">>, <<"unavailable">>}],
- children = []},
- ejabberd_sm:close_session_unset_presence(StateData#state.sid,
- StateData#state.user,
- StateData#state.server,
- StateData#state.resource,
- <<"">>),
- presence_broadcast(StateData, From,
- StateData#state.pres_a, Packet),
- presence_broadcast(StateData, From,
- StateData#state.pres_i, Packet)
- end
- end,
- bounce_messages();
+ case StateData#state.mgmt_state of
+ resumed ->
+ ?INFO_MSG("Closing former stream of resumed session for ~s",
+ [jlib:jid_to_string(StateData#state.jid)]);
_ ->
- ok
+ if StateName == session_established;
+ StateName == wait_for_resume ->
+ case StateData#state.authenticated of
+ replaced ->
+ ?INFO_MSG("(~w) Replaced session for ~s",
+ [StateData#state.socket,
+ jlib:jid_to_string(StateData#state.jid)]),
+ From = StateData#state.jid,
+ Packet = #xmlel{name = <<"presence">>,
+ attrs = [{<<"type">>, <<"unavailable">>}],
+ children =
+ [#xmlel{name = <<"status">>, attrs = [],
+ children =
+ [{xmlcdata,
+ <<"Replaced by new connection">>}]}]},
+ ejabberd_sm:close_session_unset_presence(StateData#state.sid,
+ StateData#state.user,
+ StateData#state.server,
+ StateData#state.resource,
+ <<"Replaced by new connection">>),
+ presence_broadcast(StateData, From,
+ StateData#state.pres_a, Packet),
+ presence_broadcast(StateData, From,
+ StateData#state.pres_i, Packet),
+ handle_unacked_stanzas(StateData);
+ _ ->
+ ?INFO_MSG("(~w) Close session for ~s",
+ [StateData#state.socket,
+ jlib:jid_to_string(StateData#state.jid)]),
+ EmptySet = (?SETS):new(),
+ case StateData of
+ #state{pres_last = undefined,
+ pres_a = EmptySet,
+ pres_i = EmptySet,
+ pres_invis = false} ->
+ ejabberd_sm:close_session(StateData#state.sid,
+ StateData#state.user,
+ StateData#state.server,
+ StateData#state.resource);
+ _ ->
+ From = StateData#state.jid,
+ Packet = #xmlel{name = <<"presence">>,
+ attrs = [{<<"type">>, <<"unavailable">>}],
+ children = []},
+ ejabberd_sm:close_session_unset_presence(StateData#state.sid,
+ StateData#state.user,
+ StateData#state.server,
+ StateData#state.resource,
+ <<"">>),
+ presence_broadcast(StateData, From,
+ StateData#state.pres_a, Packet),
+ presence_broadcast(StateData, From,
+ StateData#state.pres_i, Packet)
+ end,
+ handle_unacked_stanzas(StateData)
+ end,
+ bounce_messages();
+ true ->
+ ok
+ end
end,
(StateData#state.sockmod):close(StateData#state.socket),
ok.
@@ -1672,6 +1827,8 @@ change_shaper(StateData, JID) ->
(StateData#state.sockmod):change_shaper(StateData#state.socket,
Shaper).
+send_text(StateData, Text) when StateData#state.mgmt_state == pending ->
+ ?DEBUG("Cannot send text while waiting for resumption: ~p", [Text]);
send_text(StateData, Text) when StateData#state.xml_socket ->
?DEBUG("Send Text on stream = ~p", [Text]),
(StateData#state.sockmod):send_xml(StateData#state.socket,
@@ -1680,12 +1837,23 @@ send_text(StateData, Text) ->
?DEBUG("Send XML on stream = ~p", [Text]),
(StateData#state.sockmod):send(StateData#state.socket, Text).
+send_element(StateData, El) when StateData#state.mgmt_state == pending ->
+ ?DEBUG("Cannot send element while waiting for resumption: ~p", [El]);
send_element(StateData, El) when StateData#state.xml_socket ->
(StateData#state.sockmod):send_xml(StateData#state.socket,
{xmlstreamelement, El});
send_element(StateData, El) ->
send_text(StateData, xml:element_to_binary(El)).
+send_stanza(StateData, Stanza) when StateData#state.mgmt_state == pending ->
+ mgmt_queue_add(StateData, Stanza);
+send_stanza(StateData, Stanza) when StateData#state.mgmt_state == active ->
+ send_stanza_and_ack_req(StateData, Stanza),
+ mgmt_queue_add(StateData, Stanza);
+send_stanza(StateData, Stanza) ->
+ send_element(StateData, Stanza),
+ StateData.
+
send_header(StateData, Server, Version, Lang)
when StateData#state.xml_socket ->
VersionAttr = case Version of
@@ -1721,6 +1889,9 @@ send_header(StateData, Server, Version, Lang) ->
send_text(StateData, iolist_to_binary(Header)).
send_trailer(StateData)
+ when StateData#state.mgmt_state == pending ->
+ ?DEBUG("Cannot send stream trailer while waiting for resumption", []);
+send_trailer(StateData)
when StateData#state.xml_socket ->
(StateData#state.sockmod):send_xml(StateData#state.socket,
{xmlstreamend, <<"stream:stream">>});
@@ -1739,6 +1910,19 @@ is_auth_packet(El) ->
_ -> false
end.
+is_stanza(#xmlel{name = Name, attrs = Attrs}) when Name == <<"message">>;
+ Name == <<"presence">>;
+ Name == <<"iq">> ->
+ case xml:get_attr(<<"xmlns">>, Attrs) of
+ {value, NS} when NS /= <<"jabber:client">>,
+ NS /= <<"jabber:server">> ->
+ false;
+ _ ->
+ true
+ end;
+is_stanza(_El) ->
+ false.
+
get_auth_tags([#xmlel{name = Name, children = Els} | L],
U, P, D, R) ->
CData = xml:get_cdata(Els),
@@ -1866,12 +2050,12 @@ presence_update(From, Packet, StateData) ->
ejabberd_hooks:run(user_available_hook,
NewStateData#state.server,
[NewStateData#state.jid]),
- if NewPriority >= 0 ->
- resend_offline_messages(NewStateData),
- resend_subscription_requests(NewStateData);
- true -> ok
- end,
- presence_broadcast_first(From, NewStateData,
+ ResentStateData = if NewPriority >= 0 ->
+ resend_offline_messages(NewStateData),
+ resend_subscription_requests(NewStateData);
+ true -> NewStateData
+ end,
+ presence_broadcast_first(From, ResentStateData,
Packet);
true ->
presence_broadcast_to_trusted(NewStateData, From,
@@ -2168,11 +2352,6 @@ resend_offline_messages(StateData) ->
end,
if Pass ->
ejabberd_router:route(From, To, Packet);
- %% send_element(StateData, FixedPacket),
- %% ejabberd_hooks:run(user_receive_packet,
- %% StateData#state.server,
- %% [StateData#state.jid,
- %% From, To, FixedPacket]);
true -> ok
end
end,
@@ -2185,10 +2364,11 @@ resend_subscription_requests(#state{user = User,
PendingSubscriptions =
ejabberd_hooks:run_fold(resend_subscription_requests_hook,
Server, [], [User, Server]),
- lists:foreach(fun (XMLPacket) ->
- send_element(StateData, XMLPacket)
- end,
- PendingSubscriptions).
+ lists:foldl(fun (XMLPacket, AccStateData) ->
+ send_stanza(AccStateData, XMLPacket)
+ end,
+ StateData,
+ PendingSubscriptions).
get_showtag(undefined) -> <<"unavailable">>;
get_showtag(Presence) ->
@@ -2267,6 +2447,15 @@ fsm_next_state_gc(StateName, PackedStateData) ->
fsm_next_state(session_established, StateData) ->
{next_state, session_established, StateData,
?C2S_HIBERNATE_TIMEOUT};
+fsm_next_state(wait_for_resume, #state{mgmt_pending_since = undefined} =
+ StateData) ->
+ {next_state, wait_for_resume,
+ StateData#state{mgmt_pending_since = os:timestamp()},
+ StateData#state.mgmt_timeout};
+fsm_next_state(wait_for_resume, StateData) ->
+ Diff = timer:now_diff(os:timestamp(), StateData#state.mgmt_pending_since),
+ Timeout = max(StateData#state.mgmt_timeout - Diff div 1000, 1),
+ {next_state, wait_for_resume, StateData, Timeout};
fsm_next_state(StateName, StateData) ->
{next_state, StateName, StateData, ?C2S_OPEN_TIMEOUT}.
@@ -2275,6 +2464,15 @@ fsm_next_state(StateName, StateData) ->
fsm_reply(Reply, session_established, StateData) ->
{reply, Reply, session_established, StateData,
?C2S_HIBERNATE_TIMEOUT};
+fsm_reply(Reply, wait_for_resume, #state{mgmt_pending_since = undefined} =
+ StateData) ->
+ {reply, Reply, wait_for_resume,
+ StateData#state{mgmt_pending_since = os:timestamp()},
+ StateData#state.mgmt_timeout};
+fsm_reply(Reply, wait_for_resume, StateData) ->
+ Diff = timer:now_diff(os:timestamp(), StateData#state.mgmt_pending_since),
+ Timeout = max(StateData#state.mgmt_timeout - Diff div 1000, 1),
+ {reply, Reply, wait_for_resume, StateData, Timeout};
fsm_reply(Reply, StateName, StateData) ->
{reply, Reply, StateName, StateData, ?C2S_OPEN_TIMEOUT}.
@@ -2364,12 +2562,399 @@ route_blocking(What, StateData) ->
PrivPushEl =
jlib:replace_from_to(jlib:jid_remove_resource(StateData#state.jid),
StateData#state.jid, jlib:iq_to_xml(PrivPushIQ)),
- send_element(StateData, PrivPushEl),
%% No need to replace active privacy list here,
%% blocking pushes are always accompanied by
%% Privacy List pushes
+ send_stanza(StateData, PrivPushEl).
+
+%%%----------------------------------------------------------------------
+%%% XEP-0198
+%%%----------------------------------------------------------------------
+
+stream_mgmt_enabled(#state{mgmt_state = disabled}) ->
+ false;
+stream_mgmt_enabled(_StateData) ->
+ true.
+
+dispatch_stream_mgmt(El, StateData)
+ when StateData#state.mgmt_state == active;
+ StateData#state.mgmt_state == pending ->
+ perform_stream_mgmt(El, StateData);
+dispatch_stream_mgmt(El, StateData) ->
+ negotiate_stream_mgmt(El, StateData).
+
+negotiate_stream_mgmt(_El, #state{resource = <<"">>} = StateData) ->
+ %% XEP-0198 says: "For client-to-server connections, the client MUST NOT
+ %% attempt to enable stream management until after it has completed Resource
+ %% Binding unless it is resuming a previous session". However, it also
+ %% says: "Stream management errors SHOULD be considered recoverable", so we
+ %% won't bail out.
+ send_element(StateData, ?MGMT_UNEXPECTED_REQUEST(?NS_STREAM_MGMT_3)),
+ StateData;
+negotiate_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) ->
+ case xml:get_attr_s(<<"xmlns">>, Attrs) of
+ Xmlns when ?IS_SUPPORTED_MGMT_XMLNS(Xmlns) ->
+ case stream_mgmt_enabled(StateData) of
+ true ->
+ case Name of
+ <<"enable">> ->
+ handle_enable(StateData#state{mgmt_xmlns = Xmlns}, Attrs);
+ _ ->
+ Res = if Name == <<"a">>;
+ Name == <<"r">>;
+ Name == <<"resume">> ->
+ ?MGMT_UNEXPECTED_REQUEST(Xmlns);
+ true ->
+ ?MGMT_BAD_REQUEST(Xmlns)
+ end,
+ send_element(StateData, Res),
+ StateData
+ end;
+ false ->
+ send_element(StateData, ?MGMT_SERVICE_UNAVAILABLE(Xmlns)),
+ StateData
+ end;
+ _ ->
+ send_element(StateData, ?MGMT_UNSUPPORTED_VERSION(?NS_STREAM_MGMT_3)),
+ StateData
+ end.
+
+perform_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) ->
+ case xml:get_attr_s(<<"xmlns">>, Attrs) of
+ Xmlns when Xmlns == StateData#state.mgmt_xmlns ->
+ case Name of
+ <<"r">> ->
+ handle_r(StateData);
+ <<"a">> ->
+ handle_a(StateData, Attrs);
+ _ ->
+ Res = if Name == <<"enable">>;
+ Name == <<"resume">> ->
+ ?MGMT_UNEXPECTED_REQUEST(Xmlns);
+ true ->
+ ?MGMT_BAD_REQUEST(Xmlns)
+ end,
+ send_element(StateData, Res),
+ StateData
+ end;
+ _ ->
+ send_element(StateData,
+ ?MGMT_UNSUPPORTED_VERSION(StateData#state.mgmt_xmlns)),
+ StateData
+ end.
+
+handle_enable(#state{mgmt_timeout = ConfigTimeout} = StateData, Attrs) ->
+ Timeout = case xml:get_attr_s(<<"resume">>, Attrs) of
+ ResumeAttr when ResumeAttr == <<"true">>;
+ ResumeAttr == <<"1">> ->
+ MaxAttr = xml:get_attr_s(<<"max">>, Attrs),
+ case catch jlib:binary_to_integer(MaxAttr) of
+ Max when is_integer(Max), Max > 0, Max =< ConfigTimeout ->
+ Max;
+ _ ->
+ ConfigTimeout
+ end;
+ _ ->
+ 0
+ end,
+ ResAttrs = [{<<"xmlns">>, StateData#state.mgmt_xmlns}] ++
+ if Timeout > 0 ->
+ ?INFO_MSG("Stream management with resumption enabled for ~s",
+ [jlib:jid_to_string(StateData#state.jid)]),
+ [{<<"id">>, make_resume_id(StateData)},
+ {<<"resume">>, <<"true">>},
+ {<<"max">>, jlib:integer_to_binary(Timeout)}];
+ true ->
+ ?INFO_MSG("Stream management without resumption enabled for ~s",
+ [jlib:jid_to_string(StateData#state.jid)]),
+ []
+ end,
+ Res = #xmlel{name = <<"enabled">>,
+ attrs = ResAttrs,
+ children = []},
+ send_element(StateData, Res),
+ StateData#state{mgmt_state = active,
+ mgmt_queue = queue:new(),
+ mgmt_timeout = Timeout * 1000}.
+
+handle_r(StateData) ->
+ H = jlib:integer_to_binary(StateData#state.mgmt_stanzas_in),
+ Res = #xmlel{name = <<"a">>,
+ attrs = [{<<"xmlns">>, StateData#state.mgmt_xmlns},
+ {<<"h">>, H}],
+ children = []},
+ send_element(StateData, Res),
+ StateData.
+
+handle_a(#state{jid = JID, mgmt_stanzas_out = NumStanzasOut} = StateData,
+ Attrs) ->
+ case catch jlib:binary_to_integer(xml:get_attr_s(<<"h">>, Attrs)) of
+ H when is_integer(H), H >= 0 ->
+ ?DEBUG("~s acknowledged ~B of ~B stanzas",
+ [jlib:jid_to_string(JID), H, NumStanzasOut]),
+ mgmt_queue_drop(StateData, H);
+ _ ->
+ ?WARNING_MSG("Ignoring invalid ACK element from ~s",
+ [jlib:jid_to_string(JID)]),
+ StateData
+ end.
+
+handle_resume(StateData, Attrs) ->
+ R = case xml:get_attr_s(<<"xmlns">>, Attrs) of
+ Xmlns when ?IS_SUPPORTED_MGMT_XMLNS(Xmlns) ->
+ case stream_mgmt_enabled(StateData) of
+ true ->
+ case {xml:get_attr(<<"previd">>, Attrs),
+ catch jlib:binary_to_integer(xml:get_attr_s(<<"h">>, Attrs))}
+ of
+ {{value, PrevID}, H} when is_integer(H) ->
+ case inherit_session_state(StateData, PrevID) of
+ {ok, InheritedState} ->
+ {ok, InheritedState, H};
+ {error, Err} ->
+ {error, ?MGMT_ITEM_NOT_FOUND(Xmlns), Err}
+ end;
+ _ ->
+ {error, ?MGMT_BAD_REQUEST(Xmlns),
+ <<"Invalid request">>}
+ end;
+ false ->
+ {error, ?MGMT_SERVICE_UNAVAILABLE(Xmlns),
+ <<"XEP-0198 disabled">>}
+ end;
+ _ ->
+ {error, ?MGMT_UNSUPPORTED_VERSION(?NS_STREAM_MGMT_3),
+ <<"Invalid XMLNS">>}
+ end,
+ case R of
+ {ok, ResumedState, NumHandled} ->
+ NewState = mgmt_queue_drop(ResumedState, NumHandled),
+ AttrXmlns = NewState#state.mgmt_xmlns,
+ AttrId = make_resume_id(NewState),
+ AttrH = jlib:integer_to_binary(NewState#state.mgmt_stanzas_in),
+ send_element(NewState,
+ #xmlel{name = <<"resumed">>,
+ attrs = [{<<"xmlns">>, AttrXmlns},
+ {<<"h">>, AttrH},
+ {<<"previd">>, AttrId}],
+ children = []}),
+ SendFun = fun(_F, _T, El) -> send_element(NewState, El) end,
+ handle_unacked_stanzas(NewState, SendFun),
+ send_element(NewState,
+ #xmlel{name = <<"r">>,
+ attrs = [{<<"xmlns">>, AttrXmlns}],
+ children = []}),
+ ?INFO_MSG("Resumed session for ~s",
+ [jlib:jid_to_string(NewState#state.jid)]),
+ {ok, NewState};
+ {error, El, Msg} ->
+ send_element(StateData, El),
+ ?INFO_MSG("Cannot resume session for ~s@~s: ~s",
+ [StateData#state.user, StateData#state.server, Msg]),
+ error
+ end.
+
+update_num_stanzas_in(#state{mgmt_state = active} = StateData, El) ->
+ NewNum = case {is_stanza(El), StateData#state.mgmt_stanzas_in} of
+ {true, 4294967295} ->
+ 0;
+ {true, Num} ->
+ Num + 1;
+ {false, Num} ->
+ Num
+ end,
+ StateData#state{mgmt_stanzas_in = NewNum};
+update_num_stanzas_in(StateData, _El) ->
+ StateData.
+
+send_stanza_and_ack_req(StateData, Stanza) ->
+ AckReq = #xmlel{name = <<"r">>,
+ attrs = [{<<"xmlns">>, StateData#state.mgmt_xmlns}],
+ children = []},
+ StanzaS = xml:element_to_binary(Stanza),
+ AckReqS = xml:element_to_binary(AckReq),
+ send_text(StateData, [StanzaS, AckReqS]).
+
+mgmt_queue_add(StateData, El) ->
+ NewNum = case StateData#state.mgmt_stanzas_out of
+ 4294967295 ->
+ 0;
+ Num ->
+ Num + 1
+ end,
+ NewState = limit_queue_length(StateData),
+ NewQueue = queue:in({NewNum, El}, NewState#state.mgmt_queue),
+ NewState#state{mgmt_queue = NewQueue, mgmt_stanzas_out = NewNum}.
+
+mgmt_queue_drop(StateData, NumHandled) ->
+ NewQueue = jlib:queue_drop_while(fun({N, _Stanza}) -> N =< NumHandled end,
+ StateData#state.mgmt_queue),
+ StateData#state{mgmt_queue = NewQueue}.
+
+limit_queue_length(#state{mgmt_max_queue = Limit} = StateData)
+ when Limit == infinity;
+ Limit == unlimited ->
+ StateData;
+limit_queue_length(#state{jid = JID,
+ mgmt_queue = Queue,
+ mgmt_max_queue = Limit} = StateData) ->
+ case queue:len(Queue) >= Limit of
+ true ->
+ ?WARNING_MSG("Dropping stanza from too long ACK queue for ~s",
+ [jlib:jid_to_string(JID)]),
+ limit_queue_length(StateData#state{mgmt_queue = queue:drop(Queue)});
+ false ->
+ StateData
+ end.
+
+log_pending_state(StateData) when StateData#state.mgmt_state /= pending ->
+ ?INFO_MSG("Waiting for resumption of stream for ~s",
+ [jlib:jid_to_string(StateData#state.jid)]);
+log_pending_state(_StateData) ->
+ ok.
+
+handle_unacked_stanzas(StateData, F)
+ when StateData#state.mgmt_state == active;
+ StateData#state.mgmt_state == pending ->
+ Queue = StateData#state.mgmt_queue,
+ case queue:len(Queue) of
+ 0 ->
+ ok;
+ N ->
+ ?INFO_MSG("~B stanzas were not acknowledged by ~s",
+ [N, jlib:jid_to_string(StateData#state.jid)]),
+ lists:foreach(
+ fun({_, #xmlel{attrs = Attrs} = El}) ->
+ From_s = xml:get_attr_s(<<"from">>, Attrs),
+ From = jlib:string_to_jid(From_s),
+ To_s = xml:get_attr_s(<<"to">>, Attrs),
+ To = jlib:string_to_jid(To_s),
+ F(From, To, El)
+ end, queue:to_list(Queue))
+ end;
+handle_unacked_stanzas(_StateData, _F) ->
+ ok.
+
+handle_unacked_stanzas(StateData)
+ when StateData#state.mgmt_state == active;
+ StateData#state.mgmt_state == pending ->
+ ReRoute = case StateData#state.mgmt_resend of
+ true ->
+ fun ejabberd_router:route/3;
+ false ->
+ fun(From, To, El) ->
+ Err =
+ jlib:make_error_reply(El,
+ ?ERR_SERVICE_UNAVAILABLE),
+ ejabberd_router:route(To, From, Err)
+ end
+ end,
+ F = fun(From, To, El) ->
+ %% We'll drop the stanza if it was <forwarded/> by some
+ %% encapsulating protocol as per XEP-0297. One such protocol is
+ %% XEP-0280, which says: "When a receiving server attempts to
+ %% deliver a forked message, and that message bounces with an
+ %% error for any reason, the receiving server MUST NOT forward
+ %% that error back to the original sender." Resending such a
+ %% stanza could easily lead to unexpected results as well.
+ case is_encapsulated_forward(El) of
+ true ->
+ ?DEBUG("Dropping forwarded stanza from ~s",
+ [xml:get_attr_s(<<"from">>, El#xmlel.attrs)]);
+ false ->
+ ReRoute(From, To, El)
+ end
+ end,
+ handle_unacked_stanzas(StateData, F);
+handle_unacked_stanzas(_StateData) ->
ok.
+is_encapsulated_forward(#xmlel{name = <<"message">>} = El) ->
+ SubTag = case {xml:get_subtag(El, <<"sent">>),
+ xml:get_subtag(El, <<"received">>),
+ xml:get_subtag(El, <<"result">>)} of
+ {false, false, false} ->
+ false;
+ {Tag, false, false} ->
+ Tag;
+ {false, Tag, false} ->
+ Tag;
+ {_, _, Tag} ->
+ Tag
+ end,
+ if SubTag == false ->
+ false;
+ true ->
+ case xml:get_subtag(SubTag, <<"forwarded">>) of
+ false ->
+ false;
+ _ ->
+ true
+ end
+ end;
+is_encapsulated_forward(_El) ->
+ false.
+
+inherit_session_state(#state{user = U, server = S} = StateData, ResumeID) ->
+ case jlib:base64_to_term(ResumeID) of
+ {term, {U, S, R, Time}} ->
+ case ejabberd_sm:get_session_pid(U, S, R) of
+ none ->
+ {error, <<"Previous session PID not found">>};
+ OldPID ->
+ OldSID = {Time, OldPID},
+ case catch resume_session(OldPID) of
+ {ok, #state{sid = OldSID} = OldStateData} ->
+ NewSID = {Time, self()}, % Old time, new PID
+ Priority = case OldStateData#state.pres_last of
+ undefined ->
+ 0;
+ Presence ->
+ get_priority_from_presence(Presence)
+ end,
+ Conn = get_conn_type(StateData),
+ Info = [{ip, StateData#state.ip}, {conn, Conn},
+ {auth_module, StateData#state.auth_module}],
+ ejabberd_sm:open_session(NewSID, U, S, R,
+ Priority, Info),
+ {ok, StateData#state{sid = NewSID,
+ jid = OldStateData#state.jid,
+ resource = OldStateData#state.resource,
+ pres_t = OldStateData#state.pres_t,
+ pres_f = OldStateData#state.pres_f,
+ pres_a = OldStateData#state.pres_a,
+ pres_i = OldStateData#state.pres_i,
+ pres_last = OldStateData#state.pres_last,
+ pres_pri = OldStateData#state.pres_pri,
+ pres_timestamp = OldStateData#state.pres_timestamp,
+ pres_invis = OldStateData#state.pres_invis,
+ privacy_list = OldStateData#state.privacy_list,
+ aux_fields = OldStateData#state.aux_fields,
+ mgmt_xmlns = OldStateData#state.mgmt_xmlns,
+ mgmt_queue = OldStateData#state.mgmt_queue,
+ mgmt_timeout = OldStateData#state.mgmt_timeout,
+ mgmt_stanzas_in = OldStateData#state.mgmt_stanzas_in,
+ mgmt_stanzas_out = OldStateData#state.mgmt_stanzas_out,
+ mgmt_state = active}};
+ _ ->
+ {error, <<"Cannot grab session state">>}
+ end
+ end;
+ error ->
+ {error, <<"Invalid 'previd' value">>}
+ end.
+
+resume_session(FsmRef) ->
+ (?GEN_FSM):sync_send_all_state_event(FsmRef, resume_session, 3000).
+
+make_resume_id(StateData) ->
+ {Time, _} = StateData#state.sid,
+ ID = {StateData#state.user,
+ StateData#state.server,
+ StateData#state.resource,
+ Time},
+ jlib:term_to_base64(ID).
+
%%%----------------------------------------------------------------------
%%% JID Set memory footprint reduction code
%%%----------------------------------------------------------------------
diff --git a/src/ejabberd_sm.erl b/src/ejabberd_sm.erl
index 2a06fd2f8..094918cd9 100644
--- a/src/ejabberd_sm.erl
+++ b/src/ejabberd_sm.erl
@@ -32,7 +32,9 @@
%% API
-export([start_link/0,
route/3,
- open_session/5, close_session/4,
+ open_session/5,
+ open_session/6,
+ close_session/4,
check_in_subscription/6,
bounce_offline_message/3,
disconnect_removed_user/2,
@@ -109,10 +111,10 @@ route(From, To, Packet) ->
_ -> ok
end.
--spec open_session(sid(), binary(), binary(), binary(), info()) -> ok.
+-spec open_session(sid(), binary(), binary(), binary(), prio(), info()) -> ok.
-open_session(SID, User, Server, Resource, Info) ->
- set_session(SID, User, Server, Resource, undefined, Info),
+open_session(SID, User, Server, Resource, Priority, Info) ->
+ set_session(SID, User, Server, Resource, Priority, Info),
mnesia:dirty_update_counter(session_counter,
jlib:nameprep(Server), 1),
check_for_sessions_to_replace(User, Server, Resource),
@@ -120,6 +122,11 @@ open_session(SID, User, Server, Resource, Info) ->
ejabberd_hooks:run(sm_register_connection_hook,
JID#jid.lserver, [SID, JID, Info]).
+-spec open_session(sid(), binary(), binary(), binary(), info()) -> ok.
+
+open_session(SID, User, Server, Resource, Info) ->
+ open_session(SID, User, Server, Resource, undefined, Info).
+
-spec close_session(sid(), binary(), binary(), binary()) -> ok.
close_session(SID, User, Server, Resource) ->
diff --git a/src/jlib.erl b/src/jlib.erl
index 0ff210652..7735d7dbc 100644
--- a/src/jlib.erl
+++ b/src/jlib.erl
@@ -45,12 +45,13 @@
timestamp_to_iso/2, timestamp_to_xml/4,
timestamp_to_xml/1, now_to_utc_string/1,
now_to_local_string/1, datetime_string_to_timestamp/1,
+ term_to_base64/1, base64_to_term/1,
decode_base64/1, encode_base64/1, ip_to_list/1,
rsm_encode/1, rsm_encode/2, rsm_decode/1,
binary_to_integer/1, binary_to_integer/2,
integer_to_binary/1, integer_to_binary/2,
atom_to_binary/1, binary_to_atom/1, tuple_to_binary/1,
- l2i/1, i2l/1, i2l/2]).
+ l2i/1, i2l/1, i2l/2, queue_drop_while/2]).
%% TODO: Remove once XEP-0091 is Obsolete
%% TODO: Remove once XEP-0091 is Obsolete
@@ -779,6 +780,21 @@ check_list(List) ->
% Base64 stuff (based on httpd_util.erl)
%
+-spec term_to_base64(term()) -> binary().
+
+term_to_base64(Term) ->
+ encode_base64(term_to_binary(Term)).
+
+-spec base64_to_term(binary()) -> {term, term()} | error.
+
+base64_to_term(Base64) ->
+ case catch binary_to_term(decode_base64(Base64), [safe]) of
+ {'EXIT', _} ->
+ error;
+ Term ->
+ {term, Term}
+ end.
+
-spec decode_base64(binary()) -> binary().
decode_base64(S) ->
@@ -893,3 +909,18 @@ i2l(L, N) when is_binary(L) ->
C when C > N -> L;
_ -> i2l(<<$0, L/binary>>, N)
end.
+
+-spec queue_drop_while(fun((term()) -> boolean()), queue()) -> queue().
+
+queue_drop_while(F, Q) ->
+ case queue:peek(Q) of
+ {value, Item} ->
+ case F(Item) of
+ true ->
+ queue_drop_while(F, queue:drop(Q));
+ _ ->
+ Q
+ end;
+ empty ->
+ Q
+ end.