aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/ejabberd_auth_external.erl8
-rw-r--r--src/ejabberd_c2s.erl840
-rw-r--r--src/ejabberd_listener.erl51
-rw-r--r--src/ejabberd_router.erl1
-rw-r--r--src/ejabberd_sm.erl17
-rw-r--r--src/ejabberd_system_monitor.erl5
-rw-r--r--src/jlib.erl33
-rw-r--r--src/mod_muc_room.erl43
-rw-r--r--src/mod_offline.erl2
-rw-r--r--src/mod_pubsub.erl71
-rw-r--r--src/mod_pubsub_odbc.erl80
-rw-r--r--src/mod_sip.erl279
-rw-r--r--src/mod_sip_proxy.erl277
-rw-r--r--src/mod_sip_registrar.erl336
14 files changed, 1819 insertions, 224 deletions
diff --git a/src/ejabberd_auth_external.erl b/src/ejabberd_auth_external.erl
index 9ae6c9081..51c1c620a 100644
--- a/src/ejabberd_auth_external.erl
+++ b/src/ejabberd_auth_external.erl
@@ -54,10 +54,8 @@ start(Host) ->
end,
"extauth"),
extauth:start(Host, Cmd),
- case check_cache_last_options(Host) of
- cache -> ok = ejabberd_auth_internal:start(Host);
- no_cache -> ok
- end.
+ check_cache_last_options(Host),
+ ejabberd_auth_internal:start(Host).
check_cache_last_options(Server) ->
case get_cache_option(Server) of
@@ -173,7 +171,7 @@ get_cache_option(Host) ->
case ejabberd_config:get_option(
{extauth_cache, Host},
fun(false) -> undefined;
- (I) when is_integer(I), I > 0 -> I
+ (I) when is_integer(I), I >= 0 -> I
end) of
undefined -> false;
CacheTime -> {true, CacheTime}
diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl
index 87b32c43c..8874c48ad 100644
--- a/src/ejabberd_c2s.erl
+++ b/src/ejabberd_c2s.erl
@@ -57,6 +57,7 @@
wait_for_bind/2,
wait_for_session/2,
wait_for_sasl_response/2,
+ wait_for_resume/2,
session_established/2,
handle_event/3,
handle_sync_event/4,
@@ -107,6 +108,15 @@
auth_module = unknown,
ip,
aux_fields = [],
+ mgmt_state,
+ mgmt_xmlns,
+ mgmt_queue,
+ mgmt_max_queue,
+ mgmt_pending_since,
+ mgmt_timeout,
+ mgmt_resend,
+ mgmt_stanzas_in = 0,
+ mgmt_stanzas_out = 0,
lang = <<"">>}).
%-define(DBGFSM, true).
@@ -155,6 +165,39 @@
-define(INVALID_FROM, ?SERR_INVALID_FROM).
+%% XEP-0198:
+
+-define(IS_STREAM_MGMT_TAG(Name),
+ Name == <<"enable">>;
+ Name == <<"resume">>;
+ Name == <<"a">>;
+ Name == <<"r">>).
+
+-define(IS_SUPPORTED_MGMT_XMLNS(Xmlns),
+ Xmlns == ?NS_STREAM_MGMT_2;
+ Xmlns == ?NS_STREAM_MGMT_3).
+
+-define(MGMT_FAILED(Condition, Xmlns),
+ #xmlel{name = <<"failed">>,
+ attrs = [{<<"xmlns">>, Xmlns}],
+ children = [#xmlel{name = Condition,
+ attrs = [{<<"xmlns">>, ?NS_STANZAS}],
+ children = []}]}).
+
+-define(MGMT_BAD_REQUEST(Xmlns),
+ ?MGMT_FAILED(<<"bad-request">>, Xmlns)).
+
+-define(MGMT_ITEM_NOT_FOUND(Xmlns),
+ ?MGMT_FAILED(<<"item-not-found">>, Xmlns)).
+
+-define(MGMT_SERVICE_UNAVAILABLE(Xmlns),
+ ?MGMT_FAILED(<<"service-unavailable">>, Xmlns)).
+
+-define(MGMT_UNEXPECTED_REQUEST(Xmlns),
+ ?MGMT_FAILED(<<"unexpected-request">>, Xmlns)).
+
+-define(MGMT_UNSUPPORTED_VERSION(Xmlns),
+ ?MGMT_FAILED(<<"unsupported-version">>, Xmlns)).
%%%----------------------------------------------------------------------
%%% API
@@ -258,6 +301,19 @@ init([{SockMod, Socket}, Opts]) ->
true -> TLSOpts2
end,
TLSOpts = [verify_none | TLSOpts3],
+ StreamMgmtEnabled = proplists:get_value(stream_management, Opts, true),
+ StreamMgmtState = if StreamMgmtEnabled -> inactive;
+ true -> disabled
+ end,
+ MaxAckQueue = case proplists:get_value(max_ack_queue, Opts) of
+ Limit when is_integer(Limit), Limit > 0 -> Limit;
+ _ -> 500
+ end,
+ ResumeTimeout = case proplists:get_value(resume_timeout, Opts) of
+ Timeout when is_integer(Timeout), Timeout >= 0 -> Timeout;
+ _ -> 300
+ end,
+ ResendOnTimeout = proplists:get_bool(resend_on_timeout, Opts),
IP = peerip(SockMod, Socket),
%% Check if IP is blacklisted:
case is_ip_blacklisted(IP) of
@@ -278,8 +334,12 @@ init([{SockMod, Socket}, Opts]) ->
xml_socket = XMLSocket, zlib = Zlib, tls = TLS,
tls_required = StartTLSRequired,
tls_enabled = TLSEnabled, tls_options = TLSOpts,
- streamid = new_id(), access = Access,
- shaper = Shaper, ip = IP},
+ sid = {now(), self()}, streamid = new_id(),
+ access = Access, shaper = Shaper, ip = IP,
+ mgmt_state = StreamMgmtState,
+ mgmt_max_queue = MaxAckQueue,
+ mgmt_timeout = ResumeTimeout,
+ mgmt_resend = ResendOnTimeout},
{ok, wait_for_stream, StateData, ?C2S_OPEN_TIMEOUT}
end.
@@ -410,6 +470,18 @@ wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData) ->
ejabberd_hooks:run_fold(roster_get_versioning_feature,
Server, [],
[Server]),
+ StreamManagementFeature =
+ case stream_mgmt_enabled(StateData) of
+ true ->
+ [#xmlel{name = <<"sm">>,
+ attrs = [{<<"xmlns">>, ?NS_STREAM_MGMT_2}],
+ children = []},
+ #xmlel{name = <<"sm">>,
+ attrs = [{<<"xmlns">>, ?NS_STREAM_MGMT_3}],
+ children = []}];
+ false ->
+ []
+ end,
StreamFeatures = [#xmlel{name = <<"bind">>,
attrs = [{<<"xmlns">>, ?NS_BIND}],
children = []},
@@ -418,6 +490,7 @@ wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData) ->
children = []}]
++
RosterVersioningFeature ++
+ StreamManagementFeature ++
ejabberd_hooks:run_fold(c2s_stream_features,
Server, [], [Server]),
send_element(StateData,
@@ -480,6 +553,9 @@ wait_for_stream({xmlstreamerror, _}, StateData) ->
wait_for_stream(closed, StateData) ->
{stop, normal, StateData}.
+wait_for_auth({xmlstreamelement, #xmlel{name = Name} = El}, StateData)
+ when ?IS_STREAM_MGMT_TAG(Name) ->
+ fsm_next_state(wait_for_auth, dispatch_stream_mgmt(El, StateData));
wait_for_auth({xmlstreamelement, El}, StateData) ->
case is_auth_packet(El) of
{auth, _ID, get, {U, _, _, _}} ->
@@ -549,15 +625,15 @@ wait_for_auth({xmlstreamelement, El}, StateData) ->
?INFO_MSG("(~w) Accepted legacy authentication for ~s by ~p",
[StateData#state.socket,
jlib:jid_to_string(JID), AuthModule]),
- SID = {now(), self()},
- Conn = (StateData#state.sockmod):get_conn_type(
- StateData#state.socket),
+ Conn = get_conn_type(StateData),
Info = [{ip, StateData#state.ip}, {conn, Conn},
{auth_module, AuthModule}],
Res = jlib:make_result_iq_reply(
El#xmlel{children = []}),
send_element(StateData, Res),
- ejabberd_sm:open_session(SID, U, StateData#state.server, R, Info),
+ ejabberd_sm:open_session(StateData#state.sid, U,
+ StateData#state.server, R,
+ Info),
change_shaper(StateData, JID),
{Fs, Ts} =
ejabberd_hooks:run_fold(roster_get_subscription_lists,
@@ -575,7 +651,7 @@ wait_for_auth({xmlstreamelement, El}, StateData) ->
[U, StateData#state.server]),
NewStateData = StateData#state{user = U,
resource = R,
- jid = JID, sid = SID,
+ jid = JID,
conn = Conn,
auth_module = AuthModule,
pres_f = (?SETS):from_list(Fs1),
@@ -626,6 +702,11 @@ wait_for_auth({xmlstreamerror, _}, StateData) ->
wait_for_auth(closed, StateData) ->
{stop, normal, StateData}.
+wait_for_feature_request({xmlstreamelement, #xmlel{name = Name} = El},
+ StateData)
+ when ?IS_STREAM_MGMT_TAG(Name) ->
+ fsm_next_state(wait_for_feature_request,
+ dispatch_stream_mgmt(El, StateData));
wait_for_feature_request({xmlstreamelement, El},
StateData) ->
#xmlel{name = Name, attrs = Attrs, children = Els} = El,
@@ -791,6 +872,9 @@ wait_for_feature_request({xmlstreamerror, _},
wait_for_feature_request(closed, StateData) ->
{stop, normal, StateData}.
+wait_for_sasl_response({xmlstreamelement, #xmlel{name = Name} = El}, StateData)
+ when ?IS_STREAM_MGMT_TAG(Name) ->
+ fsm_next_state(wait_for_sasl_response, dispatch_stream_mgmt(El, StateData));
wait_for_sasl_response({xmlstreamelement, El},
StateData) ->
#xmlel{name = Name, attrs = Attrs, children = Els} = El,
@@ -921,6 +1005,20 @@ resource_conflict_action(U, S, R) ->
{accept_resource, Rnew}
end.
+wait_for_bind({xmlstreamelement, #xmlel{name = Name, attrs = Attrs} = El},
+ StateData)
+ when ?IS_STREAM_MGMT_TAG(Name) ->
+ case Name of
+ <<"resume">> ->
+ case handle_resume(StateData, Attrs) of
+ {ok, ResumedState} ->
+ fsm_next_state(session_established, ResumedState);
+ error ->
+ fsm_next_state(wait_for_bind, StateData)
+ end;
+ _ ->
+ fsm_next_state(wait_for_bind, dispatch_stream_mgmt(El, StateData))
+ end;
wait_for_bind({xmlstreamelement, El}, StateData) ->
case jlib:iq_query_info(El) of
#iq{type = set, xmlns = ?NS_BIND, sub_el = SubEl} =
@@ -982,61 +1080,63 @@ wait_for_bind({xmlstreamerror, _}, StateData) ->
wait_for_bind(closed, StateData) ->
{stop, normal, StateData}.
+wait_for_session({xmlstreamelement, #xmlel{name = Name} = El}, StateData)
+ when ?IS_STREAM_MGMT_TAG(Name) ->
+ fsm_next_state(wait_for_session, dispatch_stream_mgmt(El, StateData));
wait_for_session({xmlstreamelement, El}, StateData) ->
+ NewStateData = update_num_stanzas_in(StateData, El),
case jlib:iq_query_info(El) of
#iq{type = set, xmlns = ?NS_SESSION} ->
- U = StateData#state.user,
- R = StateData#state.resource,
- JID = StateData#state.jid,
- case acl:match_rule(StateData#state.server,
- StateData#state.access, JID) of
+ U = NewStateData#state.user,
+ R = NewStateData#state.resource,
+ JID = NewStateData#state.jid,
+ case acl:match_rule(NewStateData#state.server,
+ NewStateData#state.access, JID) of
allow ->
?INFO_MSG("(~w) Opened session for ~s",
- [StateData#state.socket,
+ [NewStateData#state.socket,
jlib:jid_to_string(JID)]),
Res = jlib:make_result_iq_reply(El#xmlel{children = []}),
- send_element(StateData, Res),
- change_shaper(StateData, JID),
+ NewState = send_stanza(NewStateData, Res),
+ change_shaper(NewState, JID),
{Fs, Ts} = ejabberd_hooks:run_fold(
roster_get_subscription_lists,
- StateData#state.server,
+ NewState#state.server,
{[], []},
- [U, StateData#state.server]),
+ [U, NewState#state.server]),
LJID = jlib:jid_tolower(jlib:jid_remove_resource(JID)),
Fs1 = [LJID | Fs],
Ts1 = [LJID | Ts],
PrivList =
ejabberd_hooks:run_fold(
- privacy_get_user_list, StateData#state.server,
+ privacy_get_user_list, NewState#state.server,
#userlist{},
- [U, StateData#state.server]),
- SID = {now(), self()},
- Conn = get_conn_type(StateData),
- Info = [{ip, StateData#state.ip}, {conn, Conn},
- {auth_module, StateData#state.auth_module}],
+ [U, NewState#state.server]),
+ Conn = get_conn_type(NewState),
+ Info = [{ip, NewState#state.ip}, {conn, Conn},
+ {auth_module, NewState#state.auth_module}],
ejabberd_sm:open_session(
- SID, U, StateData#state.server, R, Info),
- NewStateData =
- StateData#state{
- sid = SID,
+ NewState#state.sid, U, NewState#state.server, R, Info),
+ UpdatedStateData =
+ NewState#state{
conn = Conn,
pres_f = ?SETS:from_list(Fs1),
pres_t = ?SETS:from_list(Ts1),
privacy_list = PrivList},
fsm_next_state_pack(session_established,
- NewStateData);
+ UpdatedStateData);
_ ->
ejabberd_hooks:run(forbidden_session_hook,
- StateData#state.server, [JID]),
+ NewStateData#state.server, [JID]),
?INFO_MSG("(~w) Forbidden session for ~s",
- [StateData#state.socket,
+ [NewStateData#state.socket,
jlib:jid_to_string(JID)]),
Err = jlib:make_error_reply(El, ?ERR_NOT_ALLOWED),
- send_element(StateData, Err),
- fsm_next_state(wait_for_session, StateData)
+ send_element(NewStateData, Err),
+ fsm_next_state(wait_for_session, NewStateData)
end;
_ ->
- fsm_next_state(wait_for_session, StateData)
+ fsm_next_state(wait_for_session, NewStateData)
end;
wait_for_session(timeout, StateData) ->
@@ -1050,6 +1150,9 @@ wait_for_session({xmlstreamerror, _}, StateData) ->
wait_for_session(closed, StateData) ->
{stop, normal, StateData}.
+session_established({xmlstreamelement, #xmlel{name = Name} = El}, StateData)
+ when ?IS_STREAM_MGMT_TAG(Name) ->
+ fsm_next_state(session_established, dispatch_stream_mgmt(El, StateData));
session_established({xmlstreamelement, El},
StateData) ->
FromJID = StateData#state.jid,
@@ -1081,6 +1184,12 @@ session_established({xmlstreamerror, _}, StateData) ->
send_element(StateData, ?INVALID_XML_ERR),
send_trailer(StateData),
{stop, normal, StateData};
+session_established(closed, StateData)
+ when StateData#state.mgmt_timeout > 0,
+ StateData#state.mgmt_state == active orelse
+ StateData#state.mgmt_state == pending ->
+ log_pending_state(StateData),
+ fsm_next_state(wait_for_resume, StateData#state{mgmt_state = pending});
session_established(closed, StateData) ->
{stop, normal, StateData}.
@@ -1088,9 +1197,10 @@ session_established(closed, StateData) ->
%% connection)
session_established2(El, StateData) ->
#xmlel{name = Name, attrs = Attrs} = El,
- User = StateData#state.user,
- Server = StateData#state.server,
- FromJID = StateData#state.jid,
+ NewStateData = update_num_stanzas_in(StateData, El),
+ User = NewStateData#state.user,
+ Server = NewStateData#state.server,
+ FromJID = NewStateData#state.jid,
To = xml:get_attr_s(<<"to">>, Attrs),
ToJID = case To of
<<"">> -> jlib:make_jid(User, Server, <<"">>);
@@ -1099,7 +1209,7 @@ session_established2(El, StateData) ->
NewEl1 = jlib:remove_attr(<<"xmlns">>, El),
NewEl = case xml:get_attr_s(<<"xml:lang">>, Attrs) of
<<"">> ->
- case StateData#state.lang of
+ case NewStateData#state.lang of
<<"">> -> NewEl1;
Lang ->
xml:replace_tag_attr(<<"xml:lang">>, Lang, NewEl1)
@@ -1109,13 +1219,18 @@ session_established2(El, StateData) ->
NewState = case ToJID of
error ->
case xml:get_attr_s(<<"type">>, Attrs) of
- <<"error">> -> StateData;
- <<"result">> -> StateData;
+ <<"error">> -> NewStateData;
+ <<"result">> -> NewStateData;
_ ->
Err = jlib:make_error_reply(NewEl,
?ERR_JID_MALFORMED),
- send_element(StateData, Err),
- StateData
+ case is_stanza(Err) of
+ true ->
+ send_stanza(NewStateData, Err);
+ false ->
+ send_element(NewStateData, Err),
+ NewStateData
+ end
end;
_ ->
case Name of
@@ -1130,12 +1245,12 @@ session_established2(El, StateData) ->
#jid{user = User, server = Server,
resource = <<"">>} ->
?DEBUG("presence_update(~p,~n\t~p,~n\t~p)",
- [FromJID, PresenceEl, StateData]),
+ [FromJID, PresenceEl, NewStateData]),
presence_update(FromJID, PresenceEl,
- StateData);
+ NewStateData);
_ ->
presence_track(FromJID, ToJID, PresenceEl,
- StateData)
+ NewStateData)
end;
<<"iq">> ->
case jlib:iq_query_info(NewEl) of
@@ -1143,27 +1258,38 @@ session_established2(El, StateData) ->
when Xmlns == (?NS_PRIVACY);
Xmlns == (?NS_BLOCKING) ->
process_privacy_iq(FromJID, ToJID, IQ,
- StateData);
+ NewStateData);
_ ->
ejabberd_hooks:run(user_send_packet, Server,
[FromJID, ToJID, NewEl]),
- check_privacy_route(FromJID, StateData,
+ check_privacy_route(FromJID, NewStateData,
FromJID, ToJID, NewEl),
- StateData
+ NewStateData
end;
<<"message">> ->
ejabberd_hooks:run(user_send_packet, Server,
[FromJID, ToJID, NewEl]),
- check_privacy_route(FromJID, StateData, FromJID,
+ check_privacy_route(FromJID, NewStateData, FromJID,
ToJID, NewEl),
- StateData;
- _ -> StateData
+ NewStateData;
+ _ -> NewStateData
end
end,
ejabberd_hooks:run(c2s_loop_debug,
[{xmlstreamelement, El}]),
fsm_next_state(session_established, NewState).
+wait_for_resume({xmlstreamelement, _El} = Event, StateData) ->
+ session_established(Event, StateData),
+ fsm_next_state(wait_for_resume, StateData);
+wait_for_resume(timeout, StateData) ->
+ ?DEBUG("Timed out waiting for resumption of stream for ~s",
+ [jlib:jid_to_string(StateData#state.jid)]),
+ {stop, normal, StateData};
+wait_for_resume(Event, StateData) ->
+ ?DEBUG("Ignoring event while waiting for resumption: ~p", [Event]),
+ fsm_next_state(wait_for_resume, StateData).
+
%%----------------------------------------------------------------------
%% Func: StateName/3
%% Returns: {next_state, NextStateName, NextStateData} |
@@ -1208,6 +1334,15 @@ handle_sync_event(get_subscribed, _From, StateName,
StateData) ->
Subscribed = (?SETS):to_list(StateData#state.pres_f),
{reply, Subscribed, StateName, StateData};
+handle_sync_event(resume_session, _From, _StateName,
+ StateData) ->
+ %% The old session should be closed before the new one is opened, so we do
+ %% this here instead of leaving it to the terminate callback
+ ejabberd_sm:close_session(StateData#state.sid,
+ StateData#state.user,
+ StateData#state.server,
+ StateData#state.resource),
+ {stop, normal, {ok, StateData}, StateData#state{mgmt_state = resumed}};
handle_sync_event(_Event, _From, StateName,
StateData) ->
Reply = ok, fsm_reply(Reply, StateName, StateData).
@@ -1276,13 +1411,13 @@ handle_info({route, _From, _To, {broadcast, Data}},
jlib:jid_remove_resource(StateData#state.jid),
StateData#state.jid,
jlib:iq_to_xml(PrivPushIQ)),
- send_element(StateData, PrivPushEl),
+ NewState = send_stanza(StateData, PrivPushEl),
fsm_next_state(StateName,
- StateData#state{privacy_list = NewPL})
+ NewState#state{privacy_list = NewPL})
end;
{blocking, What} ->
- route_blocking(What, StateData),
- fsm_next_state(StateName, StateData);
+ NewState = route_blocking(What, StateData),
+ fsm_next_state(StateName, NewState);
_ ->
fsm_next_state(StateName, StateData)
end;
@@ -1528,12 +1663,12 @@ handle_info({route, From, To,
jlib:replace_from_to_attrs(jlib:jid_to_string(From),
jlib:jid_to_string(To), NewAttrs),
FixedPacket = #xmlel{name = Name, attrs = Attrs2, children = Els},
- send_element(StateData, FixedPacket),
+ SentStateData = send_stanza(StateData, FixedPacket),
ejabberd_hooks:run(user_receive_packet,
- StateData#state.server,
- [StateData#state.jid, From, To, FixedPacket]),
+ SentStateData#state.server,
+ [SentStateData#state.jid, From, To, FixedPacket]),
ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]),
- fsm_next_state(StateName, NewState);
+ fsm_next_state(StateName, SentStateData);
true ->
ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]),
fsm_next_state(StateName, NewState)
@@ -1541,7 +1676,15 @@ handle_info({route, From, To,
handle_info({'DOWN', Monitor, _Type, _Object, _Info},
_StateName, StateData)
when Monitor == StateData#state.socket_monitor ->
- {stop, normal, StateData};
+ if StateData#state.mgmt_timeout > 0,
+ StateData#state.mgmt_state == active orelse
+ StateData#state.mgmt_state == pending ->
+ log_pending_state(StateData),
+ fsm_next_state(wait_for_resume,
+ StateData#state{mgmt_state = pending});
+ true ->
+ {stop, normal, StateData}
+ end;
handle_info(system_shutdown, StateName, StateData) ->
case StateName of
wait_for_stream ->
@@ -1605,60 +1748,71 @@ print_state(State = #state{pres_t = T, pres_f = F, pres_a = A, pres_i = I}) ->
%% Returns: any
%%----------------------------------------------------------------------
terminate(_Reason, StateName, StateData) ->
- case StateName of
- session_established ->
- case StateData#state.authenticated of
- replaced ->
- ?INFO_MSG("(~w) Replaced session for ~s",
- [StateData#state.socket,
- jlib:jid_to_string(StateData#state.jid)]),
- From = StateData#state.jid,
- Packet = #xmlel{name = <<"presence">>,
- attrs = [{<<"type">>, <<"unavailable">>}],
- children =
- [#xmlel{name = <<"status">>, attrs = [],
- children =
- [{xmlcdata,
- <<"Replaced by new connection">>}]}]},
- ejabberd_sm:close_session_unset_presence(StateData#state.sid,
- StateData#state.user,
- StateData#state.server,
- StateData#state.resource,
- <<"Replaced by new connection">>),
- presence_broadcast(StateData, From,
- StateData#state.pres_a, Packet),
- presence_broadcast(StateData, From,
- StateData#state.pres_i, Packet);
- _ ->
- ?INFO_MSG("(~w) Close session for ~s",
- [StateData#state.socket,
- jlib:jid_to_string(StateData#state.jid)]),
- EmptySet = (?SETS):new(),
- case StateData of
- #state{pres_last = undefined, pres_a = EmptySet, pres_i = EmptySet, pres_invis = false} ->
- ejabberd_sm:close_session(StateData#state.sid,
- StateData#state.user,
- StateData#state.server,
- StateData#state.resource);
- _ ->
- From = StateData#state.jid,
- Packet = #xmlel{name = <<"presence">>,
- attrs = [{<<"type">>, <<"unavailable">>}],
- children = []},
- ejabberd_sm:close_session_unset_presence(StateData#state.sid,
- StateData#state.user,
- StateData#state.server,
- StateData#state.resource,
- <<"">>),
- presence_broadcast(StateData, From,
- StateData#state.pres_a, Packet),
- presence_broadcast(StateData, From,
- StateData#state.pres_i, Packet)
- end
- end,
- bounce_messages();
+ case StateData#state.mgmt_state of
+ resumed ->
+ ?INFO_MSG("Closing former stream of resumed session for ~s",
+ [jlib:jid_to_string(StateData#state.jid)]);
_ ->
- ok
+ if StateName == session_established;
+ StateName == wait_for_resume ->
+ case StateData#state.authenticated of
+ replaced ->
+ ?INFO_MSG("(~w) Replaced session for ~s",
+ [StateData#state.socket,
+ jlib:jid_to_string(StateData#state.jid)]),
+ From = StateData#state.jid,
+ Packet = #xmlel{name = <<"presence">>,
+ attrs = [{<<"type">>, <<"unavailable">>}],
+ children =
+ [#xmlel{name = <<"status">>, attrs = [],
+ children =
+ [{xmlcdata,
+ <<"Replaced by new connection">>}]}]},
+ ejabberd_sm:close_session_unset_presence(StateData#state.sid,
+ StateData#state.user,
+ StateData#state.server,
+ StateData#state.resource,
+ <<"Replaced by new connection">>),
+ presence_broadcast(StateData, From,
+ StateData#state.pres_a, Packet),
+ presence_broadcast(StateData, From,
+ StateData#state.pres_i, Packet),
+ handle_unacked_stanzas(StateData);
+ _ ->
+ ?INFO_MSG("(~w) Close session for ~s",
+ [StateData#state.socket,
+ jlib:jid_to_string(StateData#state.jid)]),
+ EmptySet = (?SETS):new(),
+ case StateData of
+ #state{pres_last = undefined,
+ pres_a = EmptySet,
+ pres_i = EmptySet,
+ pres_invis = false} ->
+ ejabberd_sm:close_session(StateData#state.sid,
+ StateData#state.user,
+ StateData#state.server,
+ StateData#state.resource);
+ _ ->
+ From = StateData#state.jid,
+ Packet = #xmlel{name = <<"presence">>,
+ attrs = [{<<"type">>, <<"unavailable">>}],
+ children = []},
+ ejabberd_sm:close_session_unset_presence(StateData#state.sid,
+ StateData#state.user,
+ StateData#state.server,
+ StateData#state.resource,
+ <<"">>),
+ presence_broadcast(StateData, From,
+ StateData#state.pres_a, Packet),
+ presence_broadcast(StateData, From,
+ StateData#state.pres_i, Packet)
+ end,
+ handle_unacked_stanzas(StateData)
+ end,
+ bounce_messages();
+ true ->
+ ok
+ end
end,
(StateData#state.sockmod):close(StateData#state.socket),
ok.
@@ -1673,6 +1827,8 @@ change_shaper(StateData, JID) ->
(StateData#state.sockmod):change_shaper(StateData#state.socket,
Shaper).
+send_text(StateData, Text) when StateData#state.mgmt_state == pending ->
+ ?DEBUG("Cannot send text while waiting for resumption: ~p", [Text]);
send_text(StateData, Text) when StateData#state.xml_socket ->
?DEBUG("Send Text on stream = ~p", [Text]),
(StateData#state.sockmod):send_xml(StateData#state.socket,
@@ -1681,12 +1837,23 @@ send_text(StateData, Text) ->
?DEBUG("Send XML on stream = ~p", [Text]),
(StateData#state.sockmod):send(StateData#state.socket, Text).
+send_element(StateData, El) when StateData#state.mgmt_state == pending ->
+ ?DEBUG("Cannot send element while waiting for resumption: ~p", [El]);
send_element(StateData, El) when StateData#state.xml_socket ->
(StateData#state.sockmod):send_xml(StateData#state.socket,
{xmlstreamelement, El});
send_element(StateData, El) ->
send_text(StateData, xml:element_to_binary(El)).
+send_stanza(StateData, Stanza) when StateData#state.mgmt_state == pending ->
+ mgmt_queue_add(StateData, Stanza);
+send_stanza(StateData, Stanza) when StateData#state.mgmt_state == active ->
+ send_stanza_and_ack_req(StateData, Stanza),
+ mgmt_queue_add(StateData, Stanza);
+send_stanza(StateData, Stanza) ->
+ send_element(StateData, Stanza),
+ StateData.
+
send_header(StateData, Server, Version, Lang)
when StateData#state.xml_socket ->
VersionAttr = case Version of
@@ -1722,6 +1889,9 @@ send_header(StateData, Server, Version, Lang) ->
send_text(StateData, iolist_to_binary(Header)).
send_trailer(StateData)
+ when StateData#state.mgmt_state == pending ->
+ ?DEBUG("Cannot send stream trailer while waiting for resumption", []);
+send_trailer(StateData)
when StateData#state.xml_socket ->
(StateData#state.sockmod):send_xml(StateData#state.socket,
{xmlstreamend, <<"stream:stream">>});
@@ -1740,6 +1910,19 @@ is_auth_packet(El) ->
_ -> false
end.
+is_stanza(#xmlel{name = Name, attrs = Attrs}) when Name == <<"message">>;
+ Name == <<"presence">>;
+ Name == <<"iq">> ->
+ case xml:get_attr(<<"xmlns">>, Attrs) of
+ {value, NS} when NS /= <<"jabber:client">>,
+ NS /= <<"jabber:server">> ->
+ false;
+ _ ->
+ true
+ end;
+is_stanza(_El) ->
+ false.
+
get_auth_tags([#xmlel{name = Name, children = Els} | L],
U, P, D, R) ->
CData = xml:get_cdata(Els),
@@ -1867,12 +2050,12 @@ presence_update(From, Packet, StateData) ->
ejabberd_hooks:run(user_available_hook,
NewStateData#state.server,
[NewStateData#state.jid]),
- if NewPriority >= 0 ->
- resend_offline_messages(NewStateData),
- resend_subscription_requests(NewStateData);
- true -> ok
- end,
- presence_broadcast_first(From, NewStateData,
+ ResentStateData = if NewPriority >= 0 ->
+ resend_offline_messages(NewStateData),
+ resend_subscription_requests(NewStateData);
+ true -> NewStateData
+ end,
+ presence_broadcast_first(From, ResentStateData,
Packet);
true ->
presence_broadcast_to_trusted(NewStateData, From,
@@ -2169,11 +2352,6 @@ resend_offline_messages(StateData) ->
end,
if Pass ->
ejabberd_router:route(From, To, Packet);
- %% send_element(StateData, FixedPacket),
- %% ejabberd_hooks:run(user_receive_packet,
- %% StateData#state.server,
- %% [StateData#state.jid,
- %% From, To, FixedPacket]);
true -> ok
end
end,
@@ -2186,10 +2364,11 @@ resend_subscription_requests(#state{user = User,
PendingSubscriptions =
ejabberd_hooks:run_fold(resend_subscription_requests_hook,
Server, [], [User, Server]),
- lists:foreach(fun (XMLPacket) ->
- send_element(StateData, XMLPacket)
- end,
- PendingSubscriptions).
+ lists:foldl(fun (XMLPacket, AccStateData) ->
+ send_stanza(AccStateData, XMLPacket)
+ end,
+ StateData,
+ PendingSubscriptions).
get_showtag(undefined) -> <<"unavailable">>;
get_showtag(Presence) ->
@@ -2268,6 +2447,15 @@ fsm_next_state_gc(StateName, PackedStateData) ->
fsm_next_state(session_established, StateData) ->
{next_state, session_established, StateData,
?C2S_HIBERNATE_TIMEOUT};
+fsm_next_state(wait_for_resume, #state{mgmt_pending_since = undefined} =
+ StateData) ->
+ {next_state, wait_for_resume,
+ StateData#state{mgmt_pending_since = os:timestamp()},
+ StateData#state.mgmt_timeout};
+fsm_next_state(wait_for_resume, StateData) ->
+ Diff = timer:now_diff(os:timestamp(), StateData#state.mgmt_pending_since),
+ Timeout = max(StateData#state.mgmt_timeout - Diff div 1000, 1),
+ {next_state, wait_for_resume, StateData, Timeout};
fsm_next_state(StateName, StateData) ->
{next_state, StateName, StateData, ?C2S_OPEN_TIMEOUT}.
@@ -2276,6 +2464,15 @@ fsm_next_state(StateName, StateData) ->
fsm_reply(Reply, session_established, StateData) ->
{reply, Reply, session_established, StateData,
?C2S_HIBERNATE_TIMEOUT};
+fsm_reply(Reply, wait_for_resume, #state{mgmt_pending_since = undefined} =
+ StateData) ->
+ {reply, Reply, wait_for_resume,
+ StateData#state{mgmt_pending_since = os:timestamp()},
+ StateData#state.mgmt_timeout};
+fsm_reply(Reply, wait_for_resume, StateData) ->
+ Diff = timer:now_diff(os:timestamp(), StateData#state.mgmt_pending_since),
+ Timeout = max(StateData#state.mgmt_timeout - Diff div 1000, 1),
+ {reply, Reply, wait_for_resume, StateData, Timeout};
fsm_reply(Reply, StateName, StateData) ->
{reply, Reply, StateName, StateData, ?C2S_OPEN_TIMEOUT}.
@@ -2365,12 +2562,399 @@ route_blocking(What, StateData) ->
PrivPushEl =
jlib:replace_from_to(jlib:jid_remove_resource(StateData#state.jid),
StateData#state.jid, jlib:iq_to_xml(PrivPushIQ)),
- send_element(StateData, PrivPushEl),
%% No need to replace active privacy list here,
%% blocking pushes are always accompanied by
%% Privacy List pushes
+ send_stanza(StateData, PrivPushEl).
+
+%%%----------------------------------------------------------------------
+%%% XEP-0198
+%%%----------------------------------------------------------------------
+
+stream_mgmt_enabled(#state{mgmt_state = disabled}) ->
+ false;
+stream_mgmt_enabled(_StateData) ->
+ true.
+
+dispatch_stream_mgmt(El, StateData)
+ when StateData#state.mgmt_state == active;
+ StateData#state.mgmt_state == pending ->
+ perform_stream_mgmt(El, StateData);
+dispatch_stream_mgmt(El, StateData) ->
+ negotiate_stream_mgmt(El, StateData).
+
+negotiate_stream_mgmt(_El, #state{resource = <<"">>} = StateData) ->
+ %% XEP-0198 says: "For client-to-server connections, the client MUST NOT
+ %% attempt to enable stream management until after it has completed Resource
+ %% Binding unless it is resuming a previous session". However, it also
+ %% says: "Stream management errors SHOULD be considered recoverable", so we
+ %% won't bail out.
+ send_element(StateData, ?MGMT_UNEXPECTED_REQUEST(?NS_STREAM_MGMT_3)),
+ StateData;
+negotiate_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) ->
+ case xml:get_attr_s(<<"xmlns">>, Attrs) of
+ Xmlns when ?IS_SUPPORTED_MGMT_XMLNS(Xmlns) ->
+ case stream_mgmt_enabled(StateData) of
+ true ->
+ case Name of
+ <<"enable">> ->
+ handle_enable(StateData#state{mgmt_xmlns = Xmlns}, Attrs);
+ _ ->
+ Res = if Name == <<"a">>;
+ Name == <<"r">>;
+ Name == <<"resume">> ->
+ ?MGMT_UNEXPECTED_REQUEST(Xmlns);
+ true ->
+ ?MGMT_BAD_REQUEST(Xmlns)
+ end,
+ send_element(StateData, Res),
+ StateData
+ end;
+ false ->
+ send_element(StateData, ?MGMT_SERVICE_UNAVAILABLE(Xmlns)),
+ StateData
+ end;
+ _ ->
+ send_element(StateData, ?MGMT_UNSUPPORTED_VERSION(?NS_STREAM_MGMT_3)),
+ StateData
+ end.
+
+perform_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) ->
+ case xml:get_attr_s(<<"xmlns">>, Attrs) of
+ Xmlns when Xmlns == StateData#state.mgmt_xmlns ->
+ case Name of
+ <<"r">> ->
+ handle_r(StateData);
+ <<"a">> ->
+ handle_a(StateData, Attrs);
+ _ ->
+ Res = if Name == <<"enable">>;
+ Name == <<"resume">> ->
+ ?MGMT_UNEXPECTED_REQUEST(Xmlns);
+ true ->
+ ?MGMT_BAD_REQUEST(Xmlns)
+ end,
+ send_element(StateData, Res),
+ StateData
+ end;
+ _ ->
+ send_element(StateData,
+ ?MGMT_UNSUPPORTED_VERSION(StateData#state.mgmt_xmlns)),
+ StateData
+ end.
+
+handle_enable(#state{mgmt_timeout = ConfigTimeout} = StateData, Attrs) ->
+ Timeout = case xml:get_attr_s(<<"resume">>, Attrs) of
+ ResumeAttr when ResumeAttr == <<"true">>;
+ ResumeAttr == <<"1">> ->
+ MaxAttr = xml:get_attr_s(<<"max">>, Attrs),
+ case catch jlib:binary_to_integer(MaxAttr) of
+ Max when is_integer(Max), Max > 0, Max =< ConfigTimeout ->
+ Max;
+ _ ->
+ ConfigTimeout
+ end;
+ _ ->
+ 0
+ end,
+ ResAttrs = [{<<"xmlns">>, StateData#state.mgmt_xmlns}] ++
+ if Timeout > 0 ->
+ ?INFO_MSG("Stream management with resumption enabled for ~s",
+ [jlib:jid_to_string(StateData#state.jid)]),
+ [{<<"id">>, make_resume_id(StateData)},
+ {<<"resume">>, <<"true">>},
+ {<<"max">>, jlib:integer_to_binary(Timeout)}];
+ true ->
+ ?INFO_MSG("Stream management without resumption enabled for ~s",
+ [jlib:jid_to_string(StateData#state.jid)]),
+ []
+ end,
+ Res = #xmlel{name = <<"enabled">>,
+ attrs = ResAttrs,
+ children = []},
+ send_element(StateData, Res),
+ StateData#state{mgmt_state = active,
+ mgmt_queue = queue:new(),
+ mgmt_timeout = Timeout * 1000}.
+
+handle_r(StateData) ->
+ H = jlib:integer_to_binary(StateData#state.mgmt_stanzas_in),
+ Res = #xmlel{name = <<"a">>,
+ attrs = [{<<"xmlns">>, StateData#state.mgmt_xmlns},
+ {<<"h">>, H}],
+ children = []},
+ send_element(StateData, Res),
+ StateData.
+
+handle_a(#state{jid = JID, mgmt_stanzas_out = NumStanzasOut} = StateData,
+ Attrs) ->
+ case catch jlib:binary_to_integer(xml:get_attr_s(<<"h">>, Attrs)) of
+ H when is_integer(H), H >= 0 ->
+ ?DEBUG("~s acknowledged ~B of ~B stanzas",
+ [jlib:jid_to_string(JID), H, NumStanzasOut]),
+ mgmt_queue_drop(StateData, H);
+ _ ->
+ ?WARNING_MSG("Ignoring invalid ACK element from ~s",
+ [jlib:jid_to_string(JID)]),
+ StateData
+ end.
+
+handle_resume(StateData, Attrs) ->
+ R = case xml:get_attr_s(<<"xmlns">>, Attrs) of
+ Xmlns when ?IS_SUPPORTED_MGMT_XMLNS(Xmlns) ->
+ case stream_mgmt_enabled(StateData) of
+ true ->
+ case {xml:get_attr(<<"previd">>, Attrs),
+ catch jlib:binary_to_integer(xml:get_attr_s(<<"h">>, Attrs))}
+ of
+ {{value, PrevID}, H} when is_integer(H) ->
+ case inherit_session_state(StateData, PrevID) of
+ {ok, InheritedState} ->
+ {ok, InheritedState, H};
+ {error, Err} ->
+ {error, ?MGMT_ITEM_NOT_FOUND(Xmlns), Err}
+ end;
+ _ ->
+ {error, ?MGMT_BAD_REQUEST(Xmlns),
+ <<"Invalid request">>}
+ end;
+ false ->
+ {error, ?MGMT_SERVICE_UNAVAILABLE(Xmlns),
+ <<"XEP-0198 disabled">>}
+ end;
+ _ ->
+ {error, ?MGMT_UNSUPPORTED_VERSION(?NS_STREAM_MGMT_3),
+ <<"Invalid XMLNS">>}
+ end,
+ case R of
+ {ok, ResumedState, NumHandled} ->
+ NewState = mgmt_queue_drop(ResumedState, NumHandled),
+ AttrXmlns = NewState#state.mgmt_xmlns,
+ AttrId = make_resume_id(NewState),
+ AttrH = jlib:integer_to_binary(NewState#state.mgmt_stanzas_in),
+ send_element(NewState,
+ #xmlel{name = <<"resumed">>,
+ attrs = [{<<"xmlns">>, AttrXmlns},
+ {<<"h">>, AttrH},
+ {<<"previd">>, AttrId}],
+ children = []}),
+ SendFun = fun(_F, _T, El) -> send_element(NewState, El) end,
+ handle_unacked_stanzas(NewState, SendFun),
+ send_element(NewState,
+ #xmlel{name = <<"r">>,
+ attrs = [{<<"xmlns">>, AttrXmlns}],
+ children = []}),
+ ?INFO_MSG("Resumed session for ~s",
+ [jlib:jid_to_string(NewState#state.jid)]),
+ {ok, NewState};
+ {error, El, Msg} ->
+ send_element(StateData, El),
+ ?INFO_MSG("Cannot resume session for ~s@~s: ~s",
+ [StateData#state.user, StateData#state.server, Msg]),
+ error
+ end.
+
+update_num_stanzas_in(#state{mgmt_state = active} = StateData, El) ->
+ NewNum = case {is_stanza(El), StateData#state.mgmt_stanzas_in} of
+ {true, 4294967295} ->
+ 0;
+ {true, Num} ->
+ Num + 1;
+ {false, Num} ->
+ Num
+ end,
+ StateData#state{mgmt_stanzas_in = NewNum};
+update_num_stanzas_in(StateData, _El) ->
+ StateData.
+
+send_stanza_and_ack_req(StateData, Stanza) ->
+ AckReq = #xmlel{name = <<"r">>,
+ attrs = [{<<"xmlns">>, StateData#state.mgmt_xmlns}],
+ children = []},
+ StanzaS = xml:element_to_binary(Stanza),
+ AckReqS = xml:element_to_binary(AckReq),
+ send_text(StateData, [StanzaS, AckReqS]).
+
+mgmt_queue_add(StateData, El) ->
+ NewNum = case StateData#state.mgmt_stanzas_out of
+ 4294967295 ->
+ 0;
+ Num ->
+ Num + 1
+ end,
+ NewState = limit_queue_length(StateData),
+ NewQueue = queue:in({NewNum, El}, NewState#state.mgmt_queue),
+ NewState#state{mgmt_queue = NewQueue, mgmt_stanzas_out = NewNum}.
+
+mgmt_queue_drop(StateData, NumHandled) ->
+ NewQueue = jlib:queue_drop_while(fun({N, _Stanza}) -> N =< NumHandled end,
+ StateData#state.mgmt_queue),
+ StateData#state{mgmt_queue = NewQueue}.
+
+limit_queue_length(#state{mgmt_max_queue = Limit} = StateData)
+ when Limit == infinity;
+ Limit == unlimited ->
+ StateData;
+limit_queue_length(#state{jid = JID,
+ mgmt_queue = Queue,
+ mgmt_max_queue = Limit} = StateData) ->
+ case queue:len(Queue) >= Limit of
+ true ->
+ ?WARNING_MSG("Dropping stanza from too long ACK queue for ~s",
+ [jlib:jid_to_string(JID)]),
+ limit_queue_length(StateData#state{mgmt_queue = queue:drop(Queue)});
+ false ->
+ StateData
+ end.
+
+log_pending_state(StateData) when StateData#state.mgmt_state /= pending ->
+ ?INFO_MSG("Waiting for resumption of stream for ~s",
+ [jlib:jid_to_string(StateData#state.jid)]);
+log_pending_state(_StateData) ->
+ ok.
+
+handle_unacked_stanzas(StateData, F)
+ when StateData#state.mgmt_state == active;
+ StateData#state.mgmt_state == pending ->
+ Queue = StateData#state.mgmt_queue,
+ case queue:len(Queue) of
+ 0 ->
+ ok;
+ N ->
+ ?INFO_MSG("~B stanzas were not acknowledged by ~s",
+ [N, jlib:jid_to_string(StateData#state.jid)]),
+ lists:foreach(
+ fun({_, #xmlel{attrs = Attrs} = El}) ->
+ From_s = xml:get_attr_s(<<"from">>, Attrs),
+ From = jlib:string_to_jid(From_s),
+ To_s = xml:get_attr_s(<<"to">>, Attrs),
+ To = jlib:string_to_jid(To_s),
+ F(From, To, El)
+ end, queue:to_list(Queue))
+ end;
+handle_unacked_stanzas(_StateData, _F) ->
+ ok.
+
+handle_unacked_stanzas(StateData)
+ when StateData#state.mgmt_state == active;
+ StateData#state.mgmt_state == pending ->
+ ReRoute = case StateData#state.mgmt_resend of
+ true ->
+ fun ejabberd_router:route/3;
+ false ->
+ fun(From, To, El) ->
+ Err =
+ jlib:make_error_reply(El,
+ ?ERR_SERVICE_UNAVAILABLE),
+ ejabberd_router:route(To, From, Err)
+ end
+ end,
+ F = fun(From, To, El) ->
+ %% We'll drop the stanza if it was <forwarded/> by some
+ %% encapsulating protocol as per XEP-0297. One such protocol is
+ %% XEP-0280, which says: "When a receiving server attempts to
+ %% deliver a forked message, and that message bounces with an
+ %% error for any reason, the receiving server MUST NOT forward
+ %% that error back to the original sender." Resending such a
+ %% stanza could easily lead to unexpected results as well.
+ case is_encapsulated_forward(El) of
+ true ->
+ ?DEBUG("Dropping forwarded stanza from ~s",
+ [xml:get_attr_s(<<"from">>, El#xmlel.attrs)]);
+ false ->
+ ReRoute(From, To, El)
+ end
+ end,
+ handle_unacked_stanzas(StateData, F);
+handle_unacked_stanzas(_StateData) ->
ok.
+is_encapsulated_forward(#xmlel{name = <<"message">>} = El) ->
+ SubTag = case {xml:get_subtag(El, <<"sent">>),
+ xml:get_subtag(El, <<"received">>),
+ xml:get_subtag(El, <<"result">>)} of
+ {false, false, false} ->
+ false;
+ {Tag, false, false} ->
+ Tag;
+ {false, Tag, false} ->
+ Tag;
+ {_, _, Tag} ->
+ Tag
+ end,
+ if SubTag == false ->
+ false;
+ true ->
+ case xml:get_subtag(SubTag, <<"forwarded">>) of
+ false ->
+ false;
+ _ ->
+ true
+ end
+ end;
+is_encapsulated_forward(_El) ->
+ false.
+
+inherit_session_state(#state{user = U, server = S} = StateData, ResumeID) ->
+ case jlib:base64_to_term(ResumeID) of
+ {term, {U, S, R, Time}} ->
+ case ejabberd_sm:get_session_pid(U, S, R) of
+ none ->
+ {error, <<"Previous session PID not found">>};
+ OldPID ->
+ OldSID = {Time, OldPID},
+ case catch resume_session(OldPID) of
+ {ok, #state{sid = OldSID} = OldStateData} ->
+ NewSID = {Time, self()}, % Old time, new PID
+ Priority = case OldStateData#state.pres_last of
+ undefined ->
+ 0;
+ Presence ->
+ get_priority_from_presence(Presence)
+ end,
+ Conn = get_conn_type(StateData),
+ Info = [{ip, StateData#state.ip}, {conn, Conn},
+ {auth_module, StateData#state.auth_module}],
+ ejabberd_sm:open_session(NewSID, U, S, R,
+ Priority, Info),
+ {ok, StateData#state{sid = NewSID,
+ jid = OldStateData#state.jid,
+ resource = OldStateData#state.resource,
+ pres_t = OldStateData#state.pres_t,
+ pres_f = OldStateData#state.pres_f,
+ pres_a = OldStateData#state.pres_a,
+ pres_i = OldStateData#state.pres_i,
+ pres_last = OldStateData#state.pres_last,
+ pres_pri = OldStateData#state.pres_pri,
+ pres_timestamp = OldStateData#state.pres_timestamp,
+ pres_invis = OldStateData#state.pres_invis,
+ privacy_list = OldStateData#state.privacy_list,
+ aux_fields = OldStateData#state.aux_fields,
+ mgmt_xmlns = OldStateData#state.mgmt_xmlns,
+ mgmt_queue = OldStateData#state.mgmt_queue,
+ mgmt_timeout = OldStateData#state.mgmt_timeout,
+ mgmt_stanzas_in = OldStateData#state.mgmt_stanzas_in,
+ mgmt_stanzas_out = OldStateData#state.mgmt_stanzas_out,
+ mgmt_state = active}};
+ _ ->
+ {error, <<"Cannot grab session state">>}
+ end
+ end;
+ error ->
+ {error, <<"Invalid 'previd' value">>}
+ end.
+
+resume_session(FsmRef) ->
+ (?GEN_FSM):sync_send_all_state_event(FsmRef, resume_session, 3000).
+
+make_resume_id(StateData) ->
+ {Time, _} = StateData#state.sid,
+ ID = {StateData#state.user,
+ StateData#state.server,
+ StateData#state.resource,
+ Time},
+ jlib:term_to_base64(ID).
+
%%%----------------------------------------------------------------------
%%% JID Set memory footprint reduction code
%%%----------------------------------------------------------------------
diff --git a/src/ejabberd_listener.erl b/src/ejabberd_listener.erl
index 2051afdb2..844080a04 100644
--- a/src/ejabberd_listener.erl
+++ b/src/ejabberd_listener.erl
@@ -151,7 +151,20 @@ init_udp(PortIP, Module, Opts, SockOpts, Port, IPS) ->
{ok, Socket} ->
%% Inform my parent that this port was opened succesfully
proc_lib:init_ack({ok, self()}),
- udp_recv(Socket, Module, Opts);
+ case erlang:function_exported(Module, udp_init, 2) of
+ false ->
+ udp_recv(Socket, Module, Opts);
+ true ->
+ case catch Module:udp_init(Socket, Opts) of
+ {'EXIT', _} = Err ->
+ ?ERROR_MSG("failed to process callback function "
+ "~p:~s(~p, ~p): ~p",
+ [Module, udp_init, Socket, Opts, Err]),
+ udp_recv(Socket, Module, Opts);
+ NewOpts ->
+ udp_recv(Socket, Module, NewOpts)
+ end
+ end;
{error, Reason} ->
socket_error(Reason, PortIP, Module, SockOpts, Port, IPS)
end.
@@ -160,8 +173,20 @@ init_tcp(PortIP, Module, Opts, SockOpts, Port, IPS) ->
ListenSocket = listen_tcp(PortIP, Module, SockOpts, Port, IPS),
%% Inform my parent that this port was opened succesfully
proc_lib:init_ack({ok, self()}),
- %% And now start accepting connection attempts
- accept(ListenSocket, Module, Opts).
+ case erlang:function_exported(Module, tcp_init, 2) of
+ false ->
+ accept(ListenSocket, Module, Opts);
+ true ->
+ case catch Module:tcp_init(ListenSocket, Opts) of
+ {'EXIT', _} = Err ->
+ ?ERROR_MSG("failed to process callback function "
+ "~p:~s(~p, ~p): ~p",
+ [Module, tcp_init, ListenSocket, Opts, Err]),
+ accept(ListenSocket, Module, Opts);
+ NewOpts ->
+ accept(ListenSocket, Module, NewOpts)
+ end
+ end.
listen_tcp(PortIP, Module, SockOpts, Port, IPS) ->
case ets:lookup(listen_sockets, PortIP) of
@@ -311,11 +336,11 @@ udp_recv(Socket, Module, Opts) ->
?ERROR_MSG("failed to process UDP packet:~n"
"** Source: {~p, ~p}~n"
"** Reason: ~p~n** Packet: ~p",
- [Addr, Port, Reason, Packet]);
- _ ->
- ok
- end,
- udp_recv(Socket, Module, Opts);
+ [Addr, Port, Reason, Packet]),
+ udp_recv(Socket, Module, Opts);
+ NewOpts ->
+ udp_recv(Socket, Module, NewOpts)
+ end;
{error, Reason} ->
?ERROR_MSG("unexpected UDP error: ~s", [format_error(Reason)]),
throw({error, Reason})
@@ -342,6 +367,7 @@ start_listener2(Port, Module, Opts) ->
%% But it doesn't hurt to attempt to start it for any listener.
%% So, it's normal (and harmless) that in most cases this call returns: {error, {already_started, pid()}}
maybe_start_stun(Module),
+ maybe_start_sip(Module),
start_module_sup(Port, Module),
start_listener_sup(Port, Module, Opts).
@@ -463,6 +489,11 @@ maybe_start_stun(ejabberd_stun) ->
maybe_start_stun(_) ->
ok.
+maybe_start_sip(esip_socket) ->
+ ejabberd:start_app(esip);
+maybe_start_sip(_) ->
+ ok.
+
%%%
%%% Check options
%%%
@@ -642,7 +673,11 @@ prepare_ip(IP) when is_binary(IP) ->
prepare_mod(ejabberd_stun) ->
prepare_mod(stun);
+prepare_mod(ejabberd_sip) ->
+ prepare_mod(sip);
prepare_mod(stun) ->
stun;
+prepare_mod(sip) ->
+ esip_socket;
prepare_mod(Mod) when is_atom(Mod) ->
Mod.
diff --git a/src/ejabberd_router.erl b/src/ejabberd_router.erl
index caf444fba..70a01ee4e 100644
--- a/src/ejabberd_router.erl
+++ b/src/ejabberd_router.erl
@@ -396,6 +396,7 @@ update_tables() ->
[domain, node, pid] -> mnesia:delete_table(route);
[domain, pid] -> mnesia:delete_table(route);
[domain, pid, local_hint] -> ok;
+ [domain, pid, local_hint|_] -> mnesia:delete_table(route);
{'EXIT', _} -> ok
end,
case lists:member(local_route,
diff --git a/src/ejabberd_sm.erl b/src/ejabberd_sm.erl
index 58debf0c1..094918cd9 100644
--- a/src/ejabberd_sm.erl
+++ b/src/ejabberd_sm.erl
@@ -32,7 +32,9 @@
%% API
-export([start_link/0,
route/3,
- open_session/5, close_session/4,
+ open_session/5,
+ open_session/6,
+ close_session/4,
check_in_subscription/6,
bounce_offline_message/3,
disconnect_removed_user/2,
@@ -56,6 +58,7 @@
get_session_pid/3,
get_user_info/3,
get_user_ip/3,
+ get_max_user_sessions/2,
is_existing_resource/3
]).
@@ -108,10 +111,10 @@ route(From, To, Packet) ->
_ -> ok
end.
--spec open_session(sid(), binary(), binary(), binary(), info()) -> ok.
+-spec open_session(sid(), binary(), binary(), binary(), prio(), info()) -> ok.
-open_session(SID, User, Server, Resource, Info) ->
- set_session(SID, User, Server, Resource, undefined, Info),
+open_session(SID, User, Server, Resource, Priority, Info) ->
+ set_session(SID, User, Server, Resource, Priority, Info),
mnesia:dirty_update_counter(session_counter,
jlib:nameprep(Server), 1),
check_for_sessions_to_replace(User, Server, Resource),
@@ -119,6 +122,11 @@ open_session(SID, User, Server, Resource, Info) ->
ejabberd_hooks:run(sm_register_connection_hook,
JID#jid.lserver, [SID, JID, Info]).
+-spec open_session(sid(), binary(), binary(), binary(), info()) -> ok.
+
+open_session(SID, User, Server, Resource, Info) ->
+ open_session(SID, User, Server, Resource, undefined, Info).
+
-spec close_session(sid(), binary(), binary(), binary()) -> ok.
close_session(SID, User, Server, Resource) ->
@@ -842,6 +850,7 @@ update_tables() ->
[ur, user, node] -> mnesia:delete_table(session);
[ur, user, pid] -> mnesia:delete_table(session);
[usr, us, pid] -> mnesia:delete_table(session);
+ [usr, us, sid, priority, info] -> mnesia:delete_table(session);
[sid, usr, us, priority] ->
mnesia:delete_table(session);
[sid, usr, us, priority, info] -> ok;
diff --git a/src/ejabberd_system_monitor.erl b/src/ejabberd_system_monitor.erl
index 011c02c40..368c5a0ff 100644
--- a/src/ejabberd_system_monitor.erl
+++ b/src/ejabberd_system_monitor.erl
@@ -244,9 +244,8 @@ s2s_out_info(Pid) ->
[<<"Process type: s2s_out">>,
case FromTo of
[{From, To}] ->
- <<"\n",
- (io_lib:format("S2S connection: from ~s to ~s",
- [From, To]))/binary>>;
+ list_to_binary(io_lib:format("\nS2S connection: from ~s to ~s",
+ [From, To]));
_ -> <<"">>
end,
check_send_queue(Pid), <<"\n">>,
diff --git a/src/jlib.erl b/src/jlib.erl
index 0ff210652..7735d7dbc 100644
--- a/src/jlib.erl
+++ b/src/jlib.erl
@@ -45,12 +45,13 @@
timestamp_to_iso/2, timestamp_to_xml/4,
timestamp_to_xml/1, now_to_utc_string/1,
now_to_local_string/1, datetime_string_to_timestamp/1,
+ term_to_base64/1, base64_to_term/1,
decode_base64/1, encode_base64/1, ip_to_list/1,
rsm_encode/1, rsm_encode/2, rsm_decode/1,
binary_to_integer/1, binary_to_integer/2,
integer_to_binary/1, integer_to_binary/2,
atom_to_binary/1, binary_to_atom/1, tuple_to_binary/1,
- l2i/1, i2l/1, i2l/2]).
+ l2i/1, i2l/1, i2l/2, queue_drop_while/2]).
%% TODO: Remove once XEP-0091 is Obsolete
%% TODO: Remove once XEP-0091 is Obsolete
@@ -779,6 +780,21 @@ check_list(List) ->
% Base64 stuff (based on httpd_util.erl)
%
+-spec term_to_base64(term()) -> binary().
+
+term_to_base64(Term) ->
+ encode_base64(term_to_binary(Term)).
+
+-spec base64_to_term(binary()) -> {term, term()} | error.
+
+base64_to_term(Base64) ->
+ case catch binary_to_term(decode_base64(Base64), [safe]) of
+ {'EXIT', _} ->
+ error;
+ Term ->
+ {term, Term}
+ end.
+
-spec decode_base64(binary()) -> binary().
decode_base64(S) ->
@@ -893,3 +909,18 @@ i2l(L, N) when is_binary(L) ->
C when C > N -> L;
_ -> i2l(<<$0, L/binary>>, N)
end.
+
+-spec queue_drop_while(fun((term()) -> boolean()), queue()) -> queue().
+
+queue_drop_while(F, Q) ->
+ case queue:peek(Q) of
+ {value, Item} ->
+ case F(Item) of
+ true ->
+ queue_drop_while(F, queue:drop(Q));
+ _ ->
+ Q
+ end;
+ empty ->
+ Q
+ end.
diff --git a/src/mod_muc_room.erl b/src/mod_muc_room.erl
index 461ab1da2..3842fde40 100644
--- a/src/mod_muc_room.erl
+++ b/src/mod_muc_room.erl
@@ -245,7 +245,7 @@ normal_state({route, From, <<"">>,
NewState = expulse_participant(Packet, From, StateData,
translate:translate(Lang,
ErrorText)),
- {next_state, normal_state, NewState};
+ close_room_if_temporary_and_empty(NewState);
_ -> {next_state, normal_state, StateData}
end;
<<"chat">> ->
@@ -418,12 +418,13 @@ normal_state({route, From, <<"">>,
StateData) ->
case jlib:iq_query_info(Packet) of
#iq{type = Type, xmlns = XMLNS, lang = Lang,
- sub_el = SubEl} =
+ sub_el = #xmlel{name = SubElName} = SubEl} =
IQ
when (XMLNS == (?NS_MUC_ADMIN)) or
(XMLNS == (?NS_MUC_OWNER))
or (XMLNS == (?NS_DISCO_INFO))
or (XMLNS == (?NS_DISCO_ITEMS))
+ or (XMLNS == (?NS_VCARD))
or (XMLNS == (?NS_CAPTCHA)) ->
Res1 = case XMLNS of
?NS_MUC_ADMIN ->
@@ -434,6 +435,8 @@ normal_state({route, From, <<"">>,
process_iq_disco_info(From, Type, Lang, StateData);
?NS_DISCO_ITEMS ->
process_iq_disco_items(From, Type, Lang, StateData);
+ ?NS_VCARD ->
+ process_iq_vcard(From, Type, Lang, SubEl, StateData);
?NS_CAPTCHA ->
process_iq_captcha(From, Type, Lang, SubEl, StateData)
end,
@@ -441,7 +444,7 @@ normal_state({route, From, <<"">>,
{result, Res, SD} ->
{IQ#iq{type = result,
sub_el =
- [#xmlel{name = <<"query">>,
+ [#xmlel{name = SubElName,
attrs =
[{<<"xmlns">>,
XMLNS}],
@@ -1123,14 +1126,17 @@ process_presence(From, Nick,
end;
_ -> StateData
end,
+ close_room_if_temporary_and_empty(StateData1).
+
+close_room_if_temporary_and_empty(StateData1) ->
case not (StateData1#state.config)#config.persistent
andalso (?DICT):to_list(StateData1#state.users) == []
of
true ->
?INFO_MSG("Destroyed MUC room ~s because it's temporary "
"and empty",
- [jlib:jid_to_string(StateData#state.jid)]),
- add_to_log(room_existence, destroyed, StateData),
+ [jlib:jid_to_string(StateData1#state.jid)]),
+ add_to_log(room_existence, destroyed, StateData1),
{stop, normal, StateData1};
_ -> {next_state, normal_state, StateData1}
end.
@@ -3894,6 +3900,10 @@ set_opts([{Opt, Val} | Opts], StateData) ->
StateData#state{config =
(StateData#state.config)#config{max_users =
MaxUsers}};
+ vcard ->
+ StateData#state{config =
+ (StateData#state.config)#config{vcard =
+ Val}};
affiliations ->
StateData#state{affiliations = (?DICT):from_list(Val)};
subject -> StateData#state{subject = Val};
@@ -3926,6 +3936,7 @@ make_opts(StateData) ->
?MAKE_CONFIG_OPT(logging), ?MAKE_CONFIG_OPT(max_users),
?MAKE_CONFIG_OPT(allow_voice_requests),
?MAKE_CONFIG_OPT(voice_request_min_interval),
+ ?MAKE_CONFIG_OPT(vcard),
{captcha_whitelist,
(?SETS):to_list((StateData#state.config)#config.captcha_whitelist)},
{affiliations,
@@ -3992,6 +4003,8 @@ process_iq_disco_info(_From, get, Lang, StateData) ->
{<<"name">>, get_title(StateData)}],
children = []},
#xmlel{name = <<"feature">>,
+ attrs = [{<<"var">>, ?NS_VCARD}], children = []},
+ #xmlel{name = <<"feature">>,
attrs = [{<<"var">>, ?NS_MUC}], children = []},
?CONFIG_OPT_TO_FEATURE((Config#config.public),
<<"muc_public">>, <<"muc_hidden">>),
@@ -4064,6 +4077,26 @@ process_iq_captcha(_From, set, _Lang, SubEl,
_ -> {error, ?ERR_NOT_ACCEPTABLE}
end.
+process_iq_vcard(_From, get, _Lang, _SubEl, StateData) ->
+ #state{config = #config{vcard = VCardRaw}} = StateData,
+ case xml_stream:parse_element(VCardRaw) of
+ #xmlel{children = VCardEls} ->
+ {result, VCardEls, StateData};
+ {error, _} ->
+ {result, [], StateData}
+ end;
+process_iq_vcard(From, set, Lang, SubEl, StateData) ->
+ case get_affiliation(From, StateData) of
+ owner ->
+ VCardRaw = xml:element_to_binary(SubEl),
+ Config = StateData#state.config,
+ NewConfig = Config#config{vcard = VCardRaw},
+ change_config(NewConfig, StateData);
+ _ ->
+ ErrText = <<"Owner privileges required">>,
+ {error, ?ERRT_FORBIDDEN(Lang, ErrText)}
+ end.
+
get_title(StateData) ->
case (StateData#state.config)#config.title of
<<"">> -> StateData#state.room;
diff --git a/src/mod_offline.erl b/src/mod_offline.erl
index 77d333bd7..fca227d31 100644
--- a/src/mod_offline.erl
+++ b/src/mod_offline.erl
@@ -870,7 +870,7 @@ count_offline_messages(LUser, LServer) ->
case catch odbc_queries:count_records_where(
LServer, "spool",
<<"where username='", Username/binary, "'">>) of
- {selected, [_], [{Res}]} ->
+ {selected, [_], [[Res]]} ->
jlib:binary_to_integer(Res);
_ ->
0
diff --git a/src/mod_pubsub.erl b/src/mod_pubsub.erl
index 87d49cb54..168169a95 100644
--- a/src/mod_pubsub.erl
+++ b/src/mod_pubsub.erl
@@ -827,7 +827,7 @@ send_loop(State) ->
end;
(_) -> ok
end,
- Subscriptions)
+ lists:usort(Subscriptions))
end,
State#state.plugins),
if not State#state.ignore_pep_from_offline ->
@@ -1166,22 +1166,21 @@ disco_items(Host, Node, From) ->
%% presence hooks handling functions
%%
-caps_update(#jid{luser = U, lserver = S, lresource = R} = From, To, _Features) ->
- Pid = ejabberd_sm:get_session_pid(U, S, R),
- presence_probe(From, To, Pid).
-
-presence_probe(#jid{luser = User, lserver = Server, lresource = Resource} = JID,
- JID, Pid) ->
- presence(Server, {presence, JID, Pid}),
- presence(Server, {presence, User, Server, [Resource], JID});
-presence_probe(#jid{luser = User, lserver = Server},
- #jid{luser = User, lserver = Server}, _Pid) ->
- %% ignore presence_probe from other ressources for the current user
- %% this way, we do not send duplicated last items if user already connected with other clients
+caps_update(#jid{luser = U, lserver = S, lresource = R}, #jid{lserver = Host} = JID, _Features)
+ when Host =/= S ->
+ presence(Host, {presence, U, S, [R], JID});
+caps_update(_From, _To, _Feature) ->
+ ok.
+
+presence_probe(#jid{luser = U, lserver = S, lresource = R} = JID, JID, Pid) ->
+ presence(S, {presence, JID, Pid}),
+ presence(S, {presence, U, S, [R], JID});
+presence_probe(#jid{luser = U, lserver = S}, #jid{luser = U, lserver = S}, _Pid) ->
+ %% ignore presence_probe from my other ressources
+ %% to not get duplicated last items
ok;
-presence_probe(#jid{luser = User, lserver = Server, lresource = Resource},
- #jid{lserver = Host} = JID, _Pid) ->
- presence(Host, {presence, User, Server, [Resource], JID}).
+presence_probe(#jid{luser = U, lserver = S, lresource = R}, #jid{lserver = Host} = JID, _Pid) ->
+ presence(Host, {presence, U, S, [R], JID}).
presence(ServerHost, Presence) ->
SendLoop = case
@@ -1621,7 +1620,7 @@ command_disco_info(_Host, ?NS_PUBSUB_GET_PENDING,
node_disco_info(Host, Node, From) ->
node_disco_info(Host, Node, From, true, true).
-node_disco_info(Host, Node, From, Identity, Features) ->
+node_disco_info(Host, Node, From, _Identity, _Features) ->
% Action =
% fun(#pubsub_node{type = Type, id = NodeId}) ->
% I = case Identity of
@@ -2952,10 +2951,7 @@ publish_item(Host, ServerHost, Node, Publisher, ItemId, Payload, Access) ->
PublishModel = get_option(Options, publish_model),
DeliverPayloads = get_option(Options, deliver_payloads),
PersistItems = get_option(Options, persist_items),
- MaxItems = case PersistItems of
- false -> 0;
- true -> max_items(Host, Options)
- end,
+ MaxItems = max_items(Host, Options),
PayloadCount = payload_xmlelements(Payload),
PayloadSize = byte_size(term_to_binary(Payload)) - 2,
PayloadMaxSize = get_option(Options, max_payload_size),
@@ -3359,6 +3355,8 @@ send_items(Host, Node, NodeId, Type, {U, S, R} = LJID,
_ -> []
end,
Stanza = case ToSend of
+ [] ->
+ undefined;
[LastItem] ->
{ModifNow, ModifUSR} =
LastItem#pubsub_item.modification,
@@ -3372,11 +3370,13 @@ send_items(Host, Node, NodeId, Type, {U, S, R} = LJID,
attrs = nodeAttr(Node),
children = itemsEls(ToSend)}])
end,
- case is_tuple(Host) of
- false ->
+ case {is_tuple(Host), Stanza} of
+ {_, undefined} ->
+ ok;
+ {false, _} ->
ejabberd_router:route(service_jid(Host),
jlib:make_jid(LJID), Stanza);
- true ->
+ {true, _} ->
case ejabberd_sm:get_session_pid(U, S, R) of
C2SPid when is_pid(C2SPid) ->
ejabberd_c2s:broadcast(C2SPid,
@@ -4156,7 +4156,7 @@ presence_can_deliver({User, Server, Resource}, true) ->
({session, _, _ , _, undefined, _}, _Acc) -> false;
({session, _, {_, _, R}, _, _Priority, _}, _Acc) ->
case Resource of
- [] -> true;
+ <<>> -> true;
R -> true;
_ -> false
end
@@ -4442,15 +4442,7 @@ broadcast_stanza(Host, _Node, _NodeId, _Type, NodeOptions, SubsByDepth, NotifyTy
broadcast_stanza({LUser, LServer, LResource}, Publisher, Node, NodeId, Type, NodeOptions, SubsByDepth, NotifyType, BaseStanza, SHIM) ->
broadcast_stanza({LUser, LServer, LResource}, Node, NodeId, Type, NodeOptions, SubsByDepth, NotifyType, BaseStanza, SHIM),
%% Handles implicit presence subscriptions
- SenderResource = case LResource of
- [] ->
- case user_resources(LUser, LServer) of
- [Resource|_] -> Resource;
- _ -> <<>>
- end;
- _ ->
- LResource
- end,
+ SenderResource = user_resource(LUser, LServer, LResource),
case ejabberd_sm:get_session_pid(LUser, LServer, SenderResource) of
C2SPid when is_pid(C2SPid) ->
Stanza = case get_option(NodeOptions, notification_type, headline) of
@@ -4461,8 +4453,8 @@ broadcast_stanza({LUser, LServer, LResource}, Publisher, Node, NodeId, Type, Nod
%% Also, add "replyto" if entity has presence subscription to the account owner
%% See XEP-0163 1.1 section 4.3.1
ejabberd_c2s:broadcast(C2SPid,
- {pep_message, binary_to_list(Node)++"+notify"},
- _Sender = jlib:make_jid(LUser, LServer, ""),
+ {pep_message, <<((Node))/binary, "+notify">>},
+ _Sender = jlib:make_jid(LUser, LServer, <<"">>),
_StanzaToSend = add_extended_headers(Stanza,
_ReplyTo = extended_headers([jlib:jid_to_string(Publisher)])));
_ ->
@@ -4527,6 +4519,13 @@ subscribed_nodes_by_jid(NotifyType, SubsByDepth) ->
user_resources(User, Server) ->
ejabberd_sm:get_user_resources(User, Server).
+user_resource(User, Server, <<>>) ->
+ case user_resources(User, Server) of
+ [R | _] -> R;
+ _ -> <<>>
+ end;
+user_resource(_, _, Resource) -> Resource.
+
%%%%%%% Configuration handling
%%<p>There are several reasons why the default node configuration options request might fail:</p>
diff --git a/src/mod_pubsub_odbc.erl b/src/mod_pubsub_odbc.erl
index 00e619213..45c30a11b 100644
--- a/src/mod_pubsub_odbc.erl
+++ b/src/mod_pubsub_odbc.erl
@@ -475,7 +475,7 @@ send_loop(State) ->
end;
(_) -> ok
end,
- Subscriptions)
+ lists:usort(Subscriptions))
end,
State#state.plugins),
if not State#state.ignore_pep_from_offline ->
@@ -817,22 +817,21 @@ disco_items(Host, Node, From) ->
%% presence hooks handling functions
%%
-caps_update(#jid{luser = U, lserver = S, lresource = R} = From, To, _Features) ->
- Pid = ejabberd_sm:get_session_pid(U, S, R),
- presence_probe(From, To, Pid).
-
-presence_probe(#jid{luser = User, lserver = Server, lresource = Resource} = JID,
- JID, Pid) ->
- presence(Server, {presence, JID, Pid}),
- presence(Server, {presence, User, Server, [Resource], JID});
-presence_probe(#jid{luser = User, lserver = Server},
- #jid{luser = User, lserver = Server}, _Pid) ->
- %% ignore presence_probe from other ressources for the current user
- %% this way, we do not send duplicated last items if user already connected with other clients
+caps_update(#jid{luser = U, lserver = S, lresource = R}, #jid{lserver = Host} = JID, _Features)
+ when Host =/= S ->
+ presence(Host, {presence, U, S, [R], JID});
+caps_update(_From, _To, _Feature) ->
+ ok.
+
+presence_probe(#jid{luser = U, lserver = S, lresource = R} = JID, JID, Pid) ->
+ presence(S, {presence, JID, Pid}),
+ presence(S, {presence, U, S, [R], JID});
+presence_probe(#jid{luser = U, lserver = S}, #jid{luser = U, lserver = S}, _Pid) ->
+ %% ignore presence_probe from my other ressources
+ %% to not get duplicated last items
ok;
-presence_probe(#jid{luser = User, lserver = Server, lresource = Resource},
- #jid{lserver = Host} = JID, _Pid) ->
- presence(Host, {presence, User, Server, [Resource], JID}).
+presence_probe(#jid{luser = U, lserver = S, lresource = R}, #jid{lserver = Host} = JID, _Pid) ->
+ presence(Host, {presence, U, S, [R], JID}).
presence(ServerHost, Presence) ->
SendLoop = case
@@ -1273,7 +1272,7 @@ command_disco_info(_Host, ?NS_PUBSUB_GET_PENDING,
node_disco_info(Host, Node, From) ->
node_disco_info(Host, Node, From, true, true).
-node_disco_info(Host, Node, From, Identity, Features) ->
+node_disco_info(Host, Node, From, _Identity, _Features) ->
% Action =
% fun(#pubsub_node{type = Type, id = NodeId}) ->
% I = case Identity of
@@ -2618,9 +2617,9 @@ publish_item(Host, ServerHost, Node, Publisher, ItemId, Payload, Access) ->
Features = features(Type),
PublishFeature = lists:member(<<"publish">>, Features),
PublishModel = get_option(Options, publish_model),
- MaxItems = max_items(Host, Options),
DeliverPayloads = get_option(Options, deliver_payloads),
PersistItems = get_option(Options, persist_items),
+ MaxItems = max_items(Host, Options),
PayloadCount = payload_xmlelements(Payload),
PayloadSize = byte_size(term_to_binary(Payload)) - 2,
PayloadMaxSize = get_option(Options, max_payload_size),
@@ -3013,7 +3012,7 @@ send_items(Host, Node, NodeId, Type, LJID, last) ->
ModifNow, ModifUSR)
end,
ejabberd_router:route(service_jid(Host), jlib:make_jid(LJID), Stanza);
-send_items(Host, Node, NodeId, Type, LJID, Number) ->
+send_items(Host, Node, NodeId, Type, {U, S, R} = LJID, Number) ->
ToSend = case node_action(Host, Type, get_items,
[NodeId, LJID])
of
@@ -3026,6 +3025,8 @@ send_items(Host, Node, NodeId, Type, LJID, Number) ->
_ -> []
end,
Stanza = case ToSend of
+ [] ->
+ undefined;
[LastItem] ->
{ModifNow, ModifUSR} =
LastItem#pubsub_item.modification,
@@ -3039,7 +3040,22 @@ send_items(Host, Node, NodeId, Type, LJID, Number) ->
attrs = nodeAttr(Node),
children = itemsEls(ToSend)}])
end,
- ejabberd_router:route(service_jid(Host), jlib:make_jid(LJID), Stanza).
+ case {is_tuple(Host), Stanza} of
+ {_, undefined} ->
+ ok;
+ {false, _} ->
+ ejabberd_router:route(service_jid(Host),
+ jlib:make_jid(LJID), Stanza);
+ {true, _} ->
+ case ejabberd_sm:get_session_pid(U, S, R) of
+ C2SPid when is_pid(C2SPid) ->
+ ejabberd_c2s:broadcast(C2SPid,
+ {pep_message,
+ <<((Node))/binary, "+notify">>},
+ _Sender = service_jid(Host), Stanza);
+ _ -> ok
+ end
+ end.
%% @spec (Host, JID, Plugins) -> {error, Reason} | {result, Response}
%% Host = host()
@@ -3235,8 +3251,7 @@ set_affiliations(Host, Node, From, EntitiesEls) ->
error -> {error, ?ERR_BAD_REQUEST};
_ ->
Action = fun (#pubsub_node{type = Type,
- id = NodeId} =
- N) ->
+ id = NodeId}) ->
Owners = node_owners_call(Type, NodeId),
case lists:member(Owner, Owners) of
true ->
@@ -4062,15 +4077,7 @@ broadcast_stanza(Host, _Node, _NodeId, _Type, NodeOptions, SubsByDepth, NotifyTy
broadcast_stanza({LUser, LServer, LResource}, Publisher, Node, NodeId, Type, NodeOptions, SubsByDepth, NotifyType, BaseStanza, SHIM) ->
broadcast_stanza({LUser, LServer, LResource}, Node, NodeId, Type, NodeOptions, SubsByDepth, NotifyType, BaseStanza, SHIM),
%% Handles implicit presence subscriptions
- SenderResource = case LResource of
- [] ->
- case user_resources(LUser, LServer) of
- [Resource|_] -> Resource;
- _ -> <<>>
- end;
- _ ->
- LResource
- end,
+ SenderResource = user_resource(LUser, LServer, LResource),
case ejabberd_sm:get_session_pid(LUser, LServer, SenderResource) of
C2SPid when is_pid(C2SPid) ->
Stanza = case get_option(NodeOptions, notification_type, headline) of
@@ -4081,8 +4088,8 @@ broadcast_stanza({LUser, LServer, LResource}, Publisher, Node, NodeId, Type, Nod
%% Also, add "replyto" if entity has presence subscription to the account owner
%% See XEP-0163 1.1 section 4.3.1
ejabberd_c2s:broadcast(C2SPid,
- {pep_message, binary_to_list(Node)++"+notify"},
- _Sender = jlib:make_jid(LUser, LServer, ""),
+ {pep_message, <<((Node))/binary, "+notify">>},
+ _Sender = jlib:make_jid(LUser, LServer, <<"">>),
_StanzaToSend = add_extended_headers(Stanza,
_ReplyTo = extended_headers([jlib:jid_to_string(Publisher)])));
_ ->
@@ -4147,6 +4154,13 @@ subscribed_nodes_by_jid(NotifyType, SubsByDepth) ->
user_resources(User, Server) ->
ejabberd_sm:get_user_resources(User, Server).
+user_resource(User, Server, <<>>) ->
+ case user_resources(User, Server) of
+ [R | _] -> R;
+ _ -> <<>>
+ end;
+user_resource(_, _, Resource) -> Resource.
+
%%%%%%% Configuration handling
%%<p>There are several reasons why the default node configuration options request might fail:</p>
diff --git a/src/mod_sip.erl b/src/mod_sip.erl
new file mode 100644
index 000000000..8ed4ed8cf
--- /dev/null
+++ b/src/mod_sip.erl
@@ -0,0 +1,279 @@
+%%%-------------------------------------------------------------------
+%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%% @copyright (C) 2014, Evgeny Khramtsov
+%%% @doc
+%%%
+%%% @end
+%%% Created : 21 Apr 2014 by Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%%-------------------------------------------------------------------
+-module(mod_sip).
+
+-behaviour(gen_mod).
+-behaviour(esip).
+
+%% API
+-export([start/2, stop/1, prepare_request/1, make_response/2, at_my_host/1]).
+
+%% esip_callbacks
+-export([data_in/2, data_out/2, message_in/2, message_out/2,
+ request/2, request/3, response/2, locate/1]).
+
+-include("ejabberd.hrl").
+-include("logger.hrl").
+-include("esip.hrl").
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+start(_Host, _Opts) ->
+ ejabberd:start_app(esip),
+ esip:set_config_value(max_server_transactions, 10000),
+ esip:set_config_value(max_client_transactions, 10000),
+ esip:set_config_value(software, <<"ejabberd ", (?VERSION)/binary>>),
+ esip:set_config_value(module, ?MODULE),
+ Spec = {mod_sip_registrar, {mod_sip_registrar, start_link, []},
+ transient, 2000, worker, [mod_sip_registrar]},
+ TmpSupSpec = {mod_sip_proxy_sup,
+ {ejabberd_tmp_sup, start_link,
+ [mod_sip_proxy_sup, mod_sip_proxy]},
+ permanent, infinity, supervisor, [ejabberd_tmp_sup]},
+ supervisor:start_child(ejabberd_sup, Spec),
+ supervisor:start_child(ejabberd_sup, TmpSupSpec),
+ ok.
+
+stop(_Host) ->
+ ok.
+
+data_in(Data, #sip_socket{type = Transport,
+ addr = {MyIP, MyPort},
+ peer = {PeerIP, PeerPort}}) ->
+ ?DEBUG(
+ "SIP [~p/in] ~s:~p -> ~s:~p:~n~s",
+ [Transport, inet_parse:ntoa(PeerIP), PeerPort,
+ inet_parse:ntoa(MyIP), MyPort, Data]).
+
+data_out(Data, #sip_socket{type = Transport,
+ addr = {MyIP, MyPort},
+ peer = {PeerIP, PeerPort}}) ->
+ ?DEBUG(
+ "SIP [~p/out] ~s:~p -> ~s:~p:~n~s",
+ [Transport, inet_parse:ntoa(MyIP), MyPort,
+ inet_parse:ntoa(PeerIP), PeerPort, Data]).
+
+message_in(#sip{type = request, method = M} = Req, SIPSock)
+ when M /= <<"ACK">>, M /= <<"CANCEL">> ->
+ case action(Req, SIPSock) of
+ {relay, _LServer} ->
+ ok;
+ Action ->
+ request(Req, SIPSock, undefined, Action)
+ end;
+message_in(_, _) ->
+ ok.
+
+message_out(_, _) ->
+ ok.
+
+response(_Resp, _SIPSock) ->
+ ok.
+
+request(_Req, _SIPSock) ->
+ error.
+
+request(Req, SIPSock, TrID) ->
+ request(Req, SIPSock, TrID, action(Req, SIPSock)).
+
+request(Req, SIPSock, TrID, Action) ->
+ case Action of
+ to_me ->
+ process(Req, SIPSock);
+ register ->
+ mod_sip_registrar:request(Req, SIPSock);
+ loop ->
+ make_response(Req, #sip{status = 483, type = response});
+ {unsupported, Require} ->
+ make_response(Req, #sip{status = 420,
+ type = response,
+ hdrs = [{'unsupported',
+ Require}]});
+ {relay, LServer} ->
+ case mod_sip_proxy:start(LServer, []) of
+ {ok, Pid} ->
+ mod_sip_proxy:route(Req, SIPSock, TrID, Pid),
+ {mod_sip_proxy, route, [Pid]};
+ Err ->
+ ?INFO_MSG("failed to proxy request ~p: ~p", [Req, Err]),
+ Err
+ end;
+ {proxy_auth, Host} ->
+ make_response(
+ Req,
+ #sip{status = 407,
+ type = response,
+ hdrs = [{'proxy-authenticate',
+ make_auth_hdr(Host)}]});
+ {auth, Host} ->
+ make_response(
+ Req,
+ #sip{status = 401,
+ type = response,
+ hdrs = [{'www-authenticate',
+ make_auth_hdr(Host)}]});
+ deny ->
+ make_response(Req, #sip{status = 403,
+ type = response});
+ not_found ->
+ make_response(Req, #sip{status = 480,
+ type = response})
+ end.
+
+locate(_SIPMsg) ->
+ ok.
+
+find(#uri{user = User, host = Host}) ->
+ LUser = jlib:nodeprep(User),
+ LServer = jlib:nameprep(Host),
+ if LUser == <<"">> ->
+ to_me;
+ true ->
+ case mod_sip_registrar:find_sockets(LUser, LServer) of
+ [] ->
+ not_found;
+ [_|_] ->
+ {relay, LServer}
+ end
+ end.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+action(#sip{method = <<"REGISTER">>, type = request, hdrs = Hdrs,
+ uri = #uri{user = <<"">>} = URI} = Req, SIPSock) ->
+ case at_my_host(URI) of
+ true ->
+ case esip:get_hdrs('require', Hdrs) of
+ [_|_] = Require ->
+ {unsupported, Require};
+ _ ->
+ {_, ToURI, _} = esip:get_hdr('to', Hdrs),
+ case at_my_host(ToURI) of
+ true ->
+ case check_auth(Req, 'authorization', SIPSock) of
+ true ->
+ register;
+ false ->
+ {auth, ToURI#uri.host}
+ end;
+ false ->
+ deny
+ end
+ end;
+ false ->
+ deny
+ end;
+action(#sip{method = Method, hdrs = Hdrs, type = request} = Req, SIPSock) ->
+ case esip:get_hdr('max-forwards', Hdrs) of
+ 0 when Method == <<"OPTIONS">> ->
+ to_me;
+ 0 ->
+ loop;
+ _ ->
+ case esip:get_hdrs('proxy-require', Hdrs) of
+ [_|_] = Require ->
+ {unsupported, Require};
+ _ ->
+ {_, ToURI, _} = esip:get_hdr('to', Hdrs),
+ {_, FromURI, _} = esip:get_hdr('from', Hdrs),
+ case at_my_host(FromURI) of
+ true ->
+ case check_auth(Req, 'proxy-authorization', SIPSock) of
+ true ->
+ case at_my_host(ToURI) of
+ true ->
+ find(ToURI);
+ false ->
+ LServer = jlib:nameprep(FromURI#uri.host),
+ {relay, LServer}
+ end;
+ false ->
+ {proxy_auth, FromURI#uri.host}
+ end;
+ false ->
+ case at_my_host(ToURI) of
+ true ->
+ find(ToURI);
+ false ->
+ deny
+ end
+ end
+ end
+ end.
+
+check_auth(#sip{method = <<"CANCEL">>}, _, _SIPSock) ->
+ true;
+check_auth(#sip{method = Method, hdrs = Hdrs, body = Body}, AuthHdr, _SIPSock) ->
+ Issuer = case AuthHdr of
+ 'authorization' ->
+ to;
+ 'proxy-authorization' ->
+ from
+ end,
+ {_, #uri{user = User, host = Host}, _} = esip:get_hdr(Issuer, Hdrs),
+ LUser = jlib:nodeprep(User),
+ LServer = jlib:nameprep(Host),
+ case lists:filter(
+ fun({_, Params}) ->
+ Username = esip:get_param(<<"username">>, Params),
+ Realm = esip:get_param(<<"realm">>, Params),
+ (LUser == esip:unquote(Username))
+ and (LServer == esip:unquote(Realm))
+ end, esip:get_hdrs(AuthHdr, Hdrs)) of
+ [Auth|_] ->
+ case ejabberd_auth:get_password_s(LUser, LServer) of
+ <<"">> ->
+ false;
+ Password ->
+ esip:check_auth(Auth, Method, Body, Password)
+ end;
+ [] ->
+ false
+ end.
+
+allow() ->
+ [<<"OPTIONS">>, <<"REGISTER">>].
+
+process(#sip{method = <<"OPTIONS">>} = Req, _) ->
+ make_response(Req, #sip{type = response, status = 200,
+ hdrs = [{'allow', allow()}]});
+process(#sip{method = <<"REGISTER">>} = Req, _) ->
+ make_response(Req, #sip{type = response, status = 400});
+process(Req, _) ->
+ make_response(Req, #sip{type = response, status = 405,
+ hdrs = [{'allow', allow()}]}).
+
+prepare_request(#sip{hdrs = Hdrs1} = Req) ->
+ MF = esip:get_hdr('max-forwards', Hdrs1),
+ Hdrs2 = esip:set_hdr('max-forwards', MF-1, Hdrs1),
+ Hdrs3 = lists:filter(
+ fun({'proxy-authorization', {_, Params}}) ->
+ Realm = esip:unquote(esip:get_param(<<"realm">>, Params)),
+ not is_my_host(jlib:nameprep(Realm));
+ (_) ->
+ true
+ end, Hdrs2),
+ Req#sip{hdrs = Hdrs3}.
+
+make_auth_hdr(LServer) ->
+ Realm = jlib:nameprep(LServer),
+ {<<"Digest">>, [{<<"realm">>, esip:quote(Realm)},
+ {<<"qop">>, esip:quote(<<"auth">>)},
+ {<<"nonce">>, esip:quote(esip:make_hexstr(20))}]}.
+
+make_response(Req, Resp) ->
+ esip:make_response(Req, Resp, esip:make_tag()).
+
+at_my_host(#uri{host = Host}) ->
+ is_my_host(jlib:nameprep(Host)).
+
+is_my_host(LServer) ->
+ gen_mod:is_loaded(LServer, ?MODULE).
diff --git a/src/mod_sip_proxy.erl b/src/mod_sip_proxy.erl
new file mode 100644
index 000000000..cae75bff8
--- /dev/null
+++ b/src/mod_sip_proxy.erl
@@ -0,0 +1,277 @@
+%%%-------------------------------------------------------------------
+%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%% @copyright (C) 2014, Evgeny Khramtsov
+%%% @doc
+%%%
+%%% @end
+%%% Created : 21 Apr 2014 by Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%%-------------------------------------------------------------------
+-module(mod_sip_proxy).
+
+-define(GEN_FSM, p1_fsm).
+-behaviour(?GEN_FSM).
+
+%% API
+-export([start/2, start_link/2, route/4]).
+
+%% gen_fsm callbacks
+-export([init/1, wait_for_request/2, wait_for_response/2,
+ handle_event/3, handle_sync_event/4,
+ handle_info/3, terminate/3, code_change/4]).
+
+-include("ejabberd.hrl").
+-include("logger.hrl").
+-include("esip.hrl").
+
+-record(state, {host = <<"">> :: binary(),
+ opts = [] :: [{certfile, binary()}],
+ orig_trid,
+ responses = [] :: [#sip{}],
+ tr_ids = [] :: list(),
+ orig_req :: #sip{}}).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+start(LServer, Opts) ->
+ supervisor:start_child(mod_sip_proxy_sup, [LServer, Opts]).
+
+start_link(LServer, Opts) ->
+ ?GEN_FSM:start_link(?MODULE, [LServer, Opts], []).
+
+route(SIPMsg, _SIPSock, TrID, Pid) ->
+ ?GEN_FSM:send_event(Pid, {SIPMsg, TrID}).
+
+%%%===================================================================
+%%% gen_fsm callbacks
+%%%===================================================================
+init([Host, Opts]) ->
+ Opts1 = add_certfile(Host, Opts),
+ {ok, wait_for_request, #state{opts = Opts1, host = Host}}.
+
+wait_for_request({#sip{type = request} = Req, TrID}, State) ->
+ Opts = State#state.opts,
+ Req1 = mod_sip:prepare_request(Req),
+ case connect(Req1, Opts) of
+ {ok, SIPSockets} ->
+ NewState =
+ lists:foldl(
+ fun(_SIPSocket, {error, _} = Err) ->
+ Err;
+ (SIPSocket, #state{tr_ids = TrIDs} = AccState) ->
+ Req2 = add_via(SIPSocket, State#state.host, Req1),
+ case esip:request(SIPSocket, Req2,
+ {?MODULE, route, [self()]}) of
+ {ok, ClientTrID} ->
+ NewTrIDs = [ClientTrID|TrIDs],
+ AccState#state{tr_ids = NewTrIDs};
+ Err ->
+ cancel_pending_transactions(AccState),
+ Err
+ end
+ end, State, SIPSockets),
+ case NewState of
+ {error, _} = Err ->
+ {Status, Reason} = esip:error_status(Err),
+ esip:reply(TrID, mod_sip:make_response(
+ Req, #sip{type = response,
+ status = Status,
+ reason = Reason})),
+ {stop, normal, State};
+ _ ->
+ {next_state, wait_for_response,
+ NewState#state{orig_req = Req, orig_trid = TrID}}
+ end;
+ {error, notfound} ->
+ esip:reply(TrID, mod_sip:make_response(
+ Req, #sip{type = response,
+ status = 480,
+ reason = esip:reason(480)})),
+ {stop, normal, State};
+ Err ->
+ {Status, Reason} = esip:error_status(Err),
+ esip:reply(TrID, mod_sip:make_response(
+ Req, #sip{type = response,
+ status = Status,
+ reason = Reason})),
+ {stop, normal, State}
+ end;
+wait_for_request(_Event, State) ->
+ {next_state, wait_for_request, State}.
+
+wait_for_response({#sip{method = <<"CANCEL">>, type = request}, _TrID}, State) ->
+ cancel_pending_transactions(State),
+ {next_state, wait_for_response, State};
+wait_for_response({Resp, TrID},
+ #state{orig_req = #sip{method = Method} = Req} = State) ->
+ case Resp of
+ {error, timeout} when Method /= <<"INVITE">> ->
+ %% Absorb useless 408. See RFC4320
+ choose_best_response(State),
+ esip:stop_transaction(State#state.orig_trid),
+ {stop, normal, State};
+ {error, _} ->
+ {Status, Reason} = esip:error_status(Resp),
+ State1 = mark_transaction_as_complete(TrID, State),
+ SIPResp = mod_sip:make_response(Req,
+ #sip{type = response,
+ status = Status,
+ reason = Reason}),
+ State2 = collect_response(SIPResp, State1),
+ case State2#state.tr_ids of
+ [] ->
+ choose_best_response(State2),
+ {stop, normal, State2};
+ _ ->
+ {next_state, wait_for_response, State2}
+ end;
+ #sip{status = 100} ->
+ {next_state, wait_for_response, State};
+ #sip{status = Status} ->
+ {[_|Vias], NewHdrs} = esip:split_hdrs('via', Resp#sip.hdrs),
+ NewResp = case Vias of
+ [] ->
+ Resp#sip{hdrs = NewHdrs};
+ _ ->
+ Resp#sip{hdrs = [{'via', Vias}|NewHdrs]}
+ end,
+ if Status < 300 ->
+ esip:reply(State#state.orig_trid, NewResp);
+ true ->
+ ok
+ end,
+ State1 = if Status >= 200 ->
+ mark_transaction_as_complete(TrID, State);
+ true ->
+ State
+ end,
+ State2 = if Status >= 300 ->
+ collect_response(NewResp, State1);
+ true ->
+ State1
+ end,
+ if Status >= 600 ->
+ cancel_pending_transactions(State2);
+ true ->
+ ok
+ end,
+ case State2#state.tr_ids of
+ [] ->
+ choose_best_response(State2),
+ {stop, normal, State2};
+ _ ->
+ {next_state, wait_for_response, State2}
+ end
+ end;
+wait_for_response(_Event, State) ->
+ {next_state, wait_for_response, State}.
+
+handle_event(_Event, StateName, State) ->
+ {next_state, StateName, State}.
+
+handle_sync_event(_Event, _From, StateName, State) ->
+ Reply = ok,
+ {reply, Reply, StateName, State}.
+
+handle_info(_Info, StateName, State) ->
+ {next_state, StateName, State}.
+
+terminate(_Reason, _StateName, _State) ->
+ ok.
+
+code_change(_OldVsn, StateName, State, _Extra) ->
+ {ok, StateName, State}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+connect(#sip{hdrs = Hdrs} = Req, Opts) ->
+ {_, ToURI, _} = esip:get_hdr('to', Hdrs),
+ case mod_sip:at_my_host(ToURI) of
+ true ->
+ LUser = jlib:nodeprep(ToURI#uri.user),
+ LServer = jlib:nameprep(ToURI#uri.host),
+ case mod_sip_registrar:find_sockets(LUser, LServer) of
+ [_|_] = SIPSocks ->
+ {ok, SIPSocks};
+ [] ->
+ {error, notfound}
+ end;
+ false ->
+ case esip:connect(Req, Opts) of
+ {ok, SIPSock} ->
+ {ok, [SIPSock]};
+ {error, _} = Err ->
+ Err
+ end
+ end.
+
+cancel_pending_transactions(State) ->
+ lists:foreach(fun esip:cancel/1, State#state.tr_ids).
+
+add_certfile(LServer, Opts) ->
+ case ejabberd_config:get_option({domain_certfile, LServer},
+ fun iolist_to_binary/1) of
+ CertFile when is_binary(CertFile), CertFile /= <<"">> ->
+ [{certfile, CertFile}|Opts];
+ _ ->
+ Opts
+ end.
+
+add_via(#sip_socket{type = Transport}, LServer, #sip{hdrs = Hdrs} = Req) ->
+ ConfiguredVias = get_configured_vias(LServer),
+ {ViaHost, ViaPort} = proplists:get_value(
+ Transport, ConfiguredVias, {LServer, undefined}),
+ ViaTransport = case Transport of
+ tls -> <<"TLS">>;
+ tcp -> <<"TCP">>;
+ udp -> <<"UDP">>
+ end,
+ Via = #via{transport = ViaTransport,
+ host = ViaHost,
+ port = ViaPort,
+ params = [{<<"branch">>, esip:make_branch()},
+ {<<"rport">>, <<"">>}]},
+ Req#sip{hdrs = [{'via', [Via]}|Hdrs]}.
+
+get_configured_vias(LServer) ->
+ gen_mod:get_module_opt(
+ LServer, ?MODULE, via,
+ fun(L) ->
+ lists:map(
+ fun(Opts) ->
+ Type = proplists:get_value(type, Opts),
+ Host = proplists:get_value(host, Opts),
+ Port = proplists:get_value(port, Opts),
+ true = (Type == tcp) or (Type == tls) or (Type == udp),
+ true = is_binary(Host) and (Host /= <<"">>),
+ true = (is_integer(Port)
+ and (Port > 0) and (Port < 65536))
+ or (Port == undefined),
+ {Type, {Host, Port}}
+ end, L)
+ end, []).
+
+mark_transaction_as_complete(TrID, State) ->
+ NewTrIDs = lists:delete(TrID, State#state.tr_ids),
+ State#state{tr_ids = NewTrIDs}.
+
+collect_response(Resp, #state{responses = Resps} = State) ->
+ State#state{responses = [Resp|Resps]}.
+
+choose_best_response(#state{responses = Responses} = State) ->
+ SortedResponses = lists:keysort(#sip.status, Responses),
+ case lists:filter(
+ fun(#sip{status = Status}) ->
+ Status >= 600
+ end, SortedResponses) of
+ [Resp|_] ->
+ esip:reply(State#state.orig_trid, Resp);
+ [] ->
+ case SortedResponses of
+ [Resp|_] ->
+ esip:reply(State#state.orig_trid, Resp);
+ [] ->
+ ok
+ end
+ end.
diff --git a/src/mod_sip_registrar.erl b/src/mod_sip_registrar.erl
new file mode 100644
index 000000000..57c55be08
--- /dev/null
+++ b/src/mod_sip_registrar.erl
@@ -0,0 +1,336 @@
+%%%-------------------------------------------------------------------
+%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%% @copyright (C) 2014, Evgeny Khramtsov
+%%% @doc
+%%%
+%%% @end
+%%% Created : 23 Apr 2014 by Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%%-------------------------------------------------------------------
+-module(mod_sip_registrar).
+
+-define(GEN_SERVER, p1_server).
+-behaviour(?GEN_SERVER).
+
+%% API
+-export([start_link/0, request/2, find_sockets/2]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-include("ejabberd.hrl").
+-include("logger.hrl").
+-include("esip.hrl").
+
+-define(CALL_TIMEOUT, timer:seconds(30)).
+
+-record(binding, {socket = #sip_socket{},
+ call_id = <<"">> :: binary(),
+ cseq = 0 :: non_neg_integer(),
+ timestamp = now() :: erlang:timestamp(),
+ tref = make_ref() :: reference(),
+ expires = 0 :: non_neg_integer()}).
+
+-record(sip_session, {us = {<<"">>, <<"">>} :: {binary(), binary()},
+ bindings = [] :: [#binding{}]}).
+
+-record(state, {}).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+start_link() ->
+ ?GEN_SERVER:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+request(#sip{hdrs = Hdrs} = Req, SIPSock) ->
+ {_, #uri{user = U, host = S}, _} = esip:get_hdr('to', Hdrs),
+ LUser = jlib:nodeprep(U),
+ LServer = jlib:nameprep(S),
+ {PeerIP, _} = SIPSock#sip_socket.peer,
+ US = {LUser, LServer},
+ CallID = esip:get_hdr('call-id', Hdrs),
+ CSeq = esip:get_hdr('cseq', Hdrs),
+ Expires = esip:get_hdr('expires', Hdrs, 0),
+ case esip:get_hdrs('contact', Hdrs) of
+ [<<"*">>] when Expires == 0 ->
+ case unregister_session(US, SIPSock, CallID, CSeq) of
+ ok ->
+ ?INFO_MSG("unregister SIP session for user ~s@~s from ~s",
+ [LUser, LServer, inet_parse:ntoa(PeerIP)]),
+ mod_sip:make_response(
+ Req, #sip{type = response, status = 200});
+ {error, Why} ->
+ {Status, Reason} = make_status(Why),
+ mod_sip:make_response(
+ Req, #sip{type = response,
+ status = Status,
+ reason = Reason})
+ end;
+ [{_, _URI, _Params}|_] = Contacts ->
+ ExpiresList = lists:map(
+ fun({_, _, Params}) ->
+ case to_integer(
+ esip:get_param(
+ <<"expires">>, Params),
+ 0, (1 bsl 32)-1) of
+ {ok, E} -> E;
+ _ -> Expires
+ end
+ end, Contacts),
+ Expires1 = lists:max(ExpiresList),
+ Contact = {<<"">>, #uri{user = LUser, host = LServer},
+ [{<<"expires">>, jlib:integer_to_binary(Expires1)}]},
+ MinExpires = min_expires(),
+ if Expires1 >= MinExpires ->
+ case register_session(US, SIPSock, CallID, CSeq, Expires1) of
+ ok ->
+ ?INFO_MSG("register SIP session for user ~s@~s from ~s",
+ [LUser, LServer, inet_parse:ntoa(PeerIP)]),
+ mod_sip:make_response(
+ Req,
+ #sip{type = response,
+ status = 200,
+ hdrs = [{'contact', [Contact]}]});
+ {error, Why} ->
+ {Status, Reason} = make_status(Why),
+ mod_sip:make_response(
+ Req, #sip{type = response,
+ status = Status,
+ reason = Reason})
+ end;
+ Expires1 > 0, Expires1 < MinExpires ->
+ mod_sip:make_response(
+ Req, #sip{type = response,
+ status = 423,
+ hdrs = [{'min-expires', MinExpires}]});
+ true ->
+ case unregister_session(US, SIPSock, CallID, CSeq) of
+ ok ->
+ ?INFO_MSG("unregister SIP session for user ~s@~s from ~s",
+ [LUser, LServer, inet_parse:ntoa(PeerIP)]),
+ mod_sip:make_response(
+ Req,
+ #sip{type = response, status = 200,
+ hdrs = [{'contact', [Contact]}]});
+ {error, Why} ->
+ {Status, Reason} = make_status(Why),
+ mod_sip:make_response(
+ Req, #sip{type = response,
+ status = Status,
+ reason = Reason})
+ end
+ end;
+ [] ->
+ case mnesia:dirty_read(sip_session, US) of
+ [#sip_session{bindings = Bindings}] ->
+ case pop_previous_binding(SIPSock, Bindings) of
+ {ok, #binding{expires = Expires1}, _} ->
+ Contact = {<<"">>,
+ #uri{user = LUser, host = LServer},
+ [{<<"expires">>,
+ jlib:integer_to_binary(Expires1)}]},
+ mod_sip:make_response(
+ Req, #sip{type = response, status = 200,
+ hdrs = [{'contact', [Contact]}]});
+ {error, notfound} ->
+ {Status, Reason} = make_status(notfound),
+ mod_sip:make_response(
+ Req, #sip{type = response,
+ status = Status,
+ reason = Reason})
+ end;
+ [] ->
+ {Status, Reason} = make_status(notfound),
+ mod_sip:make_response(
+ Req, #sip{type = response,
+ status = Status,
+ reason = Reason})
+ end;
+ _ ->
+ mod_sip:make_response(Req, #sip{type = response, status = 400})
+ end.
+
+find_sockets(U, S) ->
+ case mnesia:dirty_read(sip_session, {U, S}) of
+ [#sip_session{bindings = Bindings}] ->
+ [Binding#binding.socket || Binding <- Bindings];
+ [] ->
+ []
+ end.
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+init([]) ->
+ mnesia:create_table(sip_session,
+ [{ram_copies, [node()]},
+ {attributes, record_info(fields, sip_session)}]),
+ mnesia:add_table_copy(sip_session, node(), ram_copies),
+ {ok, #state{}}.
+
+handle_call({write, Session}, _From, State) ->
+ Res = write_session(Session),
+ {reply, Res, State};
+handle_call({delete, US, SIPSocket, CallID, CSeq}, _From, State) ->
+ Res = delete_session(US, SIPSocket, CallID, CSeq),
+ {reply, Res, State};
+handle_call(_Request, _From, State) ->
+ Reply = ok,
+ {reply, Reply, State}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info({write, Session}, State) ->
+ write_session(Session),
+ {noreply, State};
+handle_info({delete, US, SIPSocket, CallID, CSeq}, State) ->
+ delete_session(US, SIPSocket, CallID, CSeq),
+ {noreply, State};
+handle_info({timeout, TRef, US}, State) ->
+ delete_expired_session(US, TRef),
+ {noreply, State};
+handle_info(_Info, State) ->
+ ?ERROR_MSG("got unexpected info: ~p", [_Info]),
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+register_session(US, SIPSocket, CallID, CSeq, Expires) ->
+ Session = #sip_session{us = US,
+ bindings = [#binding{socket = SIPSocket,
+ call_id = CallID,
+ cseq = CSeq,
+ timestamp = now(),
+ expires = Expires}]},
+ call({write, Session}).
+
+unregister_session(US, SIPSocket, CallID, CSeq) ->
+ Msg = {delete, US, SIPSocket, CallID, CSeq},
+ call(Msg).
+
+write_session(#sip_session{us = {U, S} = US,
+ bindings = [#binding{socket = SIPSocket,
+ call_id = CallID,
+ expires = Expires,
+ cseq = CSeq} = Binding]}) ->
+ case mnesia:dirty_read(sip_session, US) of
+ [#sip_session{bindings = Bindings}] ->
+ case pop_previous_binding(SIPSocket, Bindings) of
+ {ok, #binding{call_id = CallID, cseq = PrevCSeq}, _}
+ when PrevCSeq > CSeq ->
+ {error, cseq_out_of_order};
+ {ok, #binding{tref = Tref}, Bindings1} ->
+ erlang:cancel_timer(Tref),
+ NewTRef = erlang:start_timer(Expires * 1000, self(), US),
+ NewBindings = [Binding#binding{tref = NewTRef}|Bindings1],
+ mnesia:dirty_write(
+ #sip_session{us = US, bindings = NewBindings});
+ {error, notfound} ->
+ MaxSessions = ejabberd_sm:get_max_user_sessions(U, S),
+ if length(Bindings) < MaxSessions ->
+ NewTRef = erlang:start_timer(Expires * 1000, self(), US),
+ NewBindings = [Binding#binding{tref = NewTRef}|Bindings],
+ mnesia:dirty_write(
+ #sip_session{us = US, bindings = NewBindings});
+ true ->
+ {error, too_many_sessions}
+ end
+ end;
+ [] ->
+ NewTRef = erlang:start_timer(Expires * 1000, self(), US),
+ NewBindings = [Binding#binding{tref = NewTRef}],
+ mnesia:dirty_write(#sip_session{us = US, bindings = NewBindings})
+ end.
+
+delete_session(US, SIPSocket, CallID, CSeq) ->
+ case mnesia:dirty_read(sip_session, US) of
+ [#sip_session{bindings = Bindings}] ->
+ case pop_previous_binding(SIPSocket, Bindings) of
+ {ok, #binding{call_id = CallID, cseq = PrevCSeq}, _}
+ when PrevCSeq > CSeq ->
+ {error, cseq_out_of_order};
+ {ok, #binding{tref = TRef}, []} ->
+ erlang:cancel_timer(TRef),
+ mnesia:dirty_delete(sip_session, US);
+ {ok, #binding{tref = TRef}, NewBindings} ->
+ erlang:cancel_timer(TRef),
+ mnesia:dirty_write(sip_session,
+ #sip_session{us = US,
+ bindings = NewBindings});
+ {error, notfound} ->
+ {error, notfound}
+ end;
+ [] ->
+ {error, notfound}
+ end.
+
+delete_expired_session(US, TRef) ->
+ case mnesia:dirty_read(sip_session, US) of
+ [#sip_session{bindings = Bindings}] ->
+ case lists:filter(
+ fun(#binding{tref = TRef1}) when TRef1 == TRef ->
+ false;
+ (_) ->
+ true
+ end, Bindings) of
+ [] ->
+ mnesia:dirty_delete(sip_session, US);
+ NewBindings ->
+ mnesia:dirty_write(sip_session,
+ #sip_session{us = US,
+ bindings = NewBindings})
+ end;
+ [] ->
+ ok
+ end.
+
+min_expires() ->
+ 60.
+
+to_integer(Bin, Min, Max) ->
+ case catch list_to_integer(binary_to_list(Bin)) of
+ N when N >= Min, N =< Max ->
+ {ok, N};
+ _ ->
+ error
+ end.
+
+pop_previous_binding(#sip_socket{peer = Peer}, Bindings) ->
+ case lists:partition(
+ fun(#binding{socket = #sip_socket{peer = Peer1}}) ->
+ Peer1 == Peer
+ end, Bindings) of
+ {[Binding], RestBindings} ->
+ {ok, Binding, RestBindings};
+ _ ->
+ {error, notfound}
+ end.
+
+call(Msg) ->
+ case catch ?GEN_SERVER:call(?MODULE, Msg, ?CALL_TIMEOUT) of
+ {'EXIT', {timeout, _}} ->
+ {error, timeout};
+ {'EXIT', Why} ->
+ {error, Why};
+ Reply ->
+ Reply
+ end.
+
+make_status(notfound) ->
+ {404, esip:reason(404)};
+make_status(cseq_out_of_order) ->
+ {500, <<"CSeq is Out of Order">>};
+make_status(timeout) ->
+ {408, esip:reason(408)};
+make_status(too_many_sessions) ->
+ {503, <<"Too Many Registered Sessions">>};
+make_status(_) ->
+ {500, esip:reason(500)}.