diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/ejabberd_auth_external.erl | 8 | ||||
-rw-r--r-- | src/ejabberd_c2s.erl | 840 | ||||
-rw-r--r-- | src/ejabberd_listener.erl | 51 | ||||
-rw-r--r-- | src/ejabberd_router.erl | 1 | ||||
-rw-r--r-- | src/ejabberd_sm.erl | 17 | ||||
-rw-r--r-- | src/ejabberd_system_monitor.erl | 5 | ||||
-rw-r--r-- | src/jlib.erl | 33 | ||||
-rw-r--r-- | src/mod_muc_room.erl | 43 | ||||
-rw-r--r-- | src/mod_offline.erl | 2 | ||||
-rw-r--r-- | src/mod_pubsub.erl | 71 | ||||
-rw-r--r-- | src/mod_pubsub_odbc.erl | 80 | ||||
-rw-r--r-- | src/mod_sip.erl | 279 | ||||
-rw-r--r-- | src/mod_sip_proxy.erl | 277 | ||||
-rw-r--r-- | src/mod_sip_registrar.erl | 336 |
14 files changed, 1819 insertions, 224 deletions
diff --git a/src/ejabberd_auth_external.erl b/src/ejabberd_auth_external.erl index 9ae6c9081..51c1c620a 100644 --- a/src/ejabberd_auth_external.erl +++ b/src/ejabberd_auth_external.erl @@ -54,10 +54,8 @@ start(Host) -> end, "extauth"), extauth:start(Host, Cmd), - case check_cache_last_options(Host) of - cache -> ok = ejabberd_auth_internal:start(Host); - no_cache -> ok - end. + check_cache_last_options(Host), + ejabberd_auth_internal:start(Host). check_cache_last_options(Server) -> case get_cache_option(Server) of @@ -173,7 +171,7 @@ get_cache_option(Host) -> case ejabberd_config:get_option( {extauth_cache, Host}, fun(false) -> undefined; - (I) when is_integer(I), I > 0 -> I + (I) when is_integer(I), I >= 0 -> I end) of undefined -> false; CacheTime -> {true, CacheTime} diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index 87b32c43c..8874c48ad 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -57,6 +57,7 @@ wait_for_bind/2, wait_for_session/2, wait_for_sasl_response/2, + wait_for_resume/2, session_established/2, handle_event/3, handle_sync_event/4, @@ -107,6 +108,15 @@ auth_module = unknown, ip, aux_fields = [], + mgmt_state, + mgmt_xmlns, + mgmt_queue, + mgmt_max_queue, + mgmt_pending_since, + mgmt_timeout, + mgmt_resend, + mgmt_stanzas_in = 0, + mgmt_stanzas_out = 0, lang = <<"">>}). %-define(DBGFSM, true). @@ -155,6 +165,39 @@ -define(INVALID_FROM, ?SERR_INVALID_FROM). +%% XEP-0198: + +-define(IS_STREAM_MGMT_TAG(Name), + Name == <<"enable">>; + Name == <<"resume">>; + Name == <<"a">>; + Name == <<"r">>). + +-define(IS_SUPPORTED_MGMT_XMLNS(Xmlns), + Xmlns == ?NS_STREAM_MGMT_2; + Xmlns == ?NS_STREAM_MGMT_3). + +-define(MGMT_FAILED(Condition, Xmlns), + #xmlel{name = <<"failed">>, + attrs = [{<<"xmlns">>, Xmlns}], + children = [#xmlel{name = Condition, + attrs = [{<<"xmlns">>, ?NS_STANZAS}], + children = []}]}). + +-define(MGMT_BAD_REQUEST(Xmlns), + ?MGMT_FAILED(<<"bad-request">>, Xmlns)). + +-define(MGMT_ITEM_NOT_FOUND(Xmlns), + ?MGMT_FAILED(<<"item-not-found">>, Xmlns)). + +-define(MGMT_SERVICE_UNAVAILABLE(Xmlns), + ?MGMT_FAILED(<<"service-unavailable">>, Xmlns)). + +-define(MGMT_UNEXPECTED_REQUEST(Xmlns), + ?MGMT_FAILED(<<"unexpected-request">>, Xmlns)). + +-define(MGMT_UNSUPPORTED_VERSION(Xmlns), + ?MGMT_FAILED(<<"unsupported-version">>, Xmlns)). %%%---------------------------------------------------------------------- %%% API @@ -258,6 +301,19 @@ init([{SockMod, Socket}, Opts]) -> true -> TLSOpts2 end, TLSOpts = [verify_none | TLSOpts3], + StreamMgmtEnabled = proplists:get_value(stream_management, Opts, true), + StreamMgmtState = if StreamMgmtEnabled -> inactive; + true -> disabled + end, + MaxAckQueue = case proplists:get_value(max_ack_queue, Opts) of + Limit when is_integer(Limit), Limit > 0 -> Limit; + _ -> 500 + end, + ResumeTimeout = case proplists:get_value(resume_timeout, Opts) of + Timeout when is_integer(Timeout), Timeout >= 0 -> Timeout; + _ -> 300 + end, + ResendOnTimeout = proplists:get_bool(resend_on_timeout, Opts), IP = peerip(SockMod, Socket), %% Check if IP is blacklisted: case is_ip_blacklisted(IP) of @@ -278,8 +334,12 @@ init([{SockMod, Socket}, Opts]) -> xml_socket = XMLSocket, zlib = Zlib, tls = TLS, tls_required = StartTLSRequired, tls_enabled = TLSEnabled, tls_options = TLSOpts, - streamid = new_id(), access = Access, - shaper = Shaper, ip = IP}, + sid = {now(), self()}, streamid = new_id(), + access = Access, shaper = Shaper, ip = IP, + mgmt_state = StreamMgmtState, + mgmt_max_queue = MaxAckQueue, + mgmt_timeout = ResumeTimeout, + mgmt_resend = ResendOnTimeout}, {ok, wait_for_stream, StateData, ?C2S_OPEN_TIMEOUT} end. @@ -410,6 +470,18 @@ wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData) -> ejabberd_hooks:run_fold(roster_get_versioning_feature, Server, [], [Server]), + StreamManagementFeature = + case stream_mgmt_enabled(StateData) of + true -> + [#xmlel{name = <<"sm">>, + attrs = [{<<"xmlns">>, ?NS_STREAM_MGMT_2}], + children = []}, + #xmlel{name = <<"sm">>, + attrs = [{<<"xmlns">>, ?NS_STREAM_MGMT_3}], + children = []}]; + false -> + [] + end, StreamFeatures = [#xmlel{name = <<"bind">>, attrs = [{<<"xmlns">>, ?NS_BIND}], children = []}, @@ -418,6 +490,7 @@ wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData) -> children = []}] ++ RosterVersioningFeature ++ + StreamManagementFeature ++ ejabberd_hooks:run_fold(c2s_stream_features, Server, [], [Server]), send_element(StateData, @@ -480,6 +553,9 @@ wait_for_stream({xmlstreamerror, _}, StateData) -> wait_for_stream(closed, StateData) -> {stop, normal, StateData}. +wait_for_auth({xmlstreamelement, #xmlel{name = Name} = El}, StateData) + when ?IS_STREAM_MGMT_TAG(Name) -> + fsm_next_state(wait_for_auth, dispatch_stream_mgmt(El, StateData)); wait_for_auth({xmlstreamelement, El}, StateData) -> case is_auth_packet(El) of {auth, _ID, get, {U, _, _, _}} -> @@ -549,15 +625,15 @@ wait_for_auth({xmlstreamelement, El}, StateData) -> ?INFO_MSG("(~w) Accepted legacy authentication for ~s by ~p", [StateData#state.socket, jlib:jid_to_string(JID), AuthModule]), - SID = {now(), self()}, - Conn = (StateData#state.sockmod):get_conn_type( - StateData#state.socket), + Conn = get_conn_type(StateData), Info = [{ip, StateData#state.ip}, {conn, Conn}, {auth_module, AuthModule}], Res = jlib:make_result_iq_reply( El#xmlel{children = []}), send_element(StateData, Res), - ejabberd_sm:open_session(SID, U, StateData#state.server, R, Info), + ejabberd_sm:open_session(StateData#state.sid, U, + StateData#state.server, R, + Info), change_shaper(StateData, JID), {Fs, Ts} = ejabberd_hooks:run_fold(roster_get_subscription_lists, @@ -575,7 +651,7 @@ wait_for_auth({xmlstreamelement, El}, StateData) -> [U, StateData#state.server]), NewStateData = StateData#state{user = U, resource = R, - jid = JID, sid = SID, + jid = JID, conn = Conn, auth_module = AuthModule, pres_f = (?SETS):from_list(Fs1), @@ -626,6 +702,11 @@ wait_for_auth({xmlstreamerror, _}, StateData) -> wait_for_auth(closed, StateData) -> {stop, normal, StateData}. +wait_for_feature_request({xmlstreamelement, #xmlel{name = Name} = El}, + StateData) + when ?IS_STREAM_MGMT_TAG(Name) -> + fsm_next_state(wait_for_feature_request, + dispatch_stream_mgmt(El, StateData)); wait_for_feature_request({xmlstreamelement, El}, StateData) -> #xmlel{name = Name, attrs = Attrs, children = Els} = El, @@ -791,6 +872,9 @@ wait_for_feature_request({xmlstreamerror, _}, wait_for_feature_request(closed, StateData) -> {stop, normal, StateData}. +wait_for_sasl_response({xmlstreamelement, #xmlel{name = Name} = El}, StateData) + when ?IS_STREAM_MGMT_TAG(Name) -> + fsm_next_state(wait_for_sasl_response, dispatch_stream_mgmt(El, StateData)); wait_for_sasl_response({xmlstreamelement, El}, StateData) -> #xmlel{name = Name, attrs = Attrs, children = Els} = El, @@ -921,6 +1005,20 @@ resource_conflict_action(U, S, R) -> {accept_resource, Rnew} end. +wait_for_bind({xmlstreamelement, #xmlel{name = Name, attrs = Attrs} = El}, + StateData) + when ?IS_STREAM_MGMT_TAG(Name) -> + case Name of + <<"resume">> -> + case handle_resume(StateData, Attrs) of + {ok, ResumedState} -> + fsm_next_state(session_established, ResumedState); + error -> + fsm_next_state(wait_for_bind, StateData) + end; + _ -> + fsm_next_state(wait_for_bind, dispatch_stream_mgmt(El, StateData)) + end; wait_for_bind({xmlstreamelement, El}, StateData) -> case jlib:iq_query_info(El) of #iq{type = set, xmlns = ?NS_BIND, sub_el = SubEl} = @@ -982,61 +1080,63 @@ wait_for_bind({xmlstreamerror, _}, StateData) -> wait_for_bind(closed, StateData) -> {stop, normal, StateData}. +wait_for_session({xmlstreamelement, #xmlel{name = Name} = El}, StateData) + when ?IS_STREAM_MGMT_TAG(Name) -> + fsm_next_state(wait_for_session, dispatch_stream_mgmt(El, StateData)); wait_for_session({xmlstreamelement, El}, StateData) -> + NewStateData = update_num_stanzas_in(StateData, El), case jlib:iq_query_info(El) of #iq{type = set, xmlns = ?NS_SESSION} -> - U = StateData#state.user, - R = StateData#state.resource, - JID = StateData#state.jid, - case acl:match_rule(StateData#state.server, - StateData#state.access, JID) of + U = NewStateData#state.user, + R = NewStateData#state.resource, + JID = NewStateData#state.jid, + case acl:match_rule(NewStateData#state.server, + NewStateData#state.access, JID) of allow -> ?INFO_MSG("(~w) Opened session for ~s", - [StateData#state.socket, + [NewStateData#state.socket, jlib:jid_to_string(JID)]), Res = jlib:make_result_iq_reply(El#xmlel{children = []}), - send_element(StateData, Res), - change_shaper(StateData, JID), + NewState = send_stanza(NewStateData, Res), + change_shaper(NewState, JID), {Fs, Ts} = ejabberd_hooks:run_fold( roster_get_subscription_lists, - StateData#state.server, + NewState#state.server, {[], []}, - [U, StateData#state.server]), + [U, NewState#state.server]), LJID = jlib:jid_tolower(jlib:jid_remove_resource(JID)), Fs1 = [LJID | Fs], Ts1 = [LJID | Ts], PrivList = ejabberd_hooks:run_fold( - privacy_get_user_list, StateData#state.server, + privacy_get_user_list, NewState#state.server, #userlist{}, - [U, StateData#state.server]), - SID = {now(), self()}, - Conn = get_conn_type(StateData), - Info = [{ip, StateData#state.ip}, {conn, Conn}, - {auth_module, StateData#state.auth_module}], + [U, NewState#state.server]), + Conn = get_conn_type(NewState), + Info = [{ip, NewState#state.ip}, {conn, Conn}, + {auth_module, NewState#state.auth_module}], ejabberd_sm:open_session( - SID, U, StateData#state.server, R, Info), - NewStateData = - StateData#state{ - sid = SID, + NewState#state.sid, U, NewState#state.server, R, Info), + UpdatedStateData = + NewState#state{ conn = Conn, pres_f = ?SETS:from_list(Fs1), pres_t = ?SETS:from_list(Ts1), privacy_list = PrivList}, fsm_next_state_pack(session_established, - NewStateData); + UpdatedStateData); _ -> ejabberd_hooks:run(forbidden_session_hook, - StateData#state.server, [JID]), + NewStateData#state.server, [JID]), ?INFO_MSG("(~w) Forbidden session for ~s", - [StateData#state.socket, + [NewStateData#state.socket, jlib:jid_to_string(JID)]), Err = jlib:make_error_reply(El, ?ERR_NOT_ALLOWED), - send_element(StateData, Err), - fsm_next_state(wait_for_session, StateData) + send_element(NewStateData, Err), + fsm_next_state(wait_for_session, NewStateData) end; _ -> - fsm_next_state(wait_for_session, StateData) + fsm_next_state(wait_for_session, NewStateData) end; wait_for_session(timeout, StateData) -> @@ -1050,6 +1150,9 @@ wait_for_session({xmlstreamerror, _}, StateData) -> wait_for_session(closed, StateData) -> {stop, normal, StateData}. +session_established({xmlstreamelement, #xmlel{name = Name} = El}, StateData) + when ?IS_STREAM_MGMT_TAG(Name) -> + fsm_next_state(session_established, dispatch_stream_mgmt(El, StateData)); session_established({xmlstreamelement, El}, StateData) -> FromJID = StateData#state.jid, @@ -1081,6 +1184,12 @@ session_established({xmlstreamerror, _}, StateData) -> send_element(StateData, ?INVALID_XML_ERR), send_trailer(StateData), {stop, normal, StateData}; +session_established(closed, StateData) + when StateData#state.mgmt_timeout > 0, + StateData#state.mgmt_state == active orelse + StateData#state.mgmt_state == pending -> + log_pending_state(StateData), + fsm_next_state(wait_for_resume, StateData#state{mgmt_state = pending}); session_established(closed, StateData) -> {stop, normal, StateData}. @@ -1088,9 +1197,10 @@ session_established(closed, StateData) -> %% connection) session_established2(El, StateData) -> #xmlel{name = Name, attrs = Attrs} = El, - User = StateData#state.user, - Server = StateData#state.server, - FromJID = StateData#state.jid, + NewStateData = update_num_stanzas_in(StateData, El), + User = NewStateData#state.user, + Server = NewStateData#state.server, + FromJID = NewStateData#state.jid, To = xml:get_attr_s(<<"to">>, Attrs), ToJID = case To of <<"">> -> jlib:make_jid(User, Server, <<"">>); @@ -1099,7 +1209,7 @@ session_established2(El, StateData) -> NewEl1 = jlib:remove_attr(<<"xmlns">>, El), NewEl = case xml:get_attr_s(<<"xml:lang">>, Attrs) of <<"">> -> - case StateData#state.lang of + case NewStateData#state.lang of <<"">> -> NewEl1; Lang -> xml:replace_tag_attr(<<"xml:lang">>, Lang, NewEl1) @@ -1109,13 +1219,18 @@ session_established2(El, StateData) -> NewState = case ToJID of error -> case xml:get_attr_s(<<"type">>, Attrs) of - <<"error">> -> StateData; - <<"result">> -> StateData; + <<"error">> -> NewStateData; + <<"result">> -> NewStateData; _ -> Err = jlib:make_error_reply(NewEl, ?ERR_JID_MALFORMED), - send_element(StateData, Err), - StateData + case is_stanza(Err) of + true -> + send_stanza(NewStateData, Err); + false -> + send_element(NewStateData, Err), + NewStateData + end end; _ -> case Name of @@ -1130,12 +1245,12 @@ session_established2(El, StateData) -> #jid{user = User, server = Server, resource = <<"">>} -> ?DEBUG("presence_update(~p,~n\t~p,~n\t~p)", - [FromJID, PresenceEl, StateData]), + [FromJID, PresenceEl, NewStateData]), presence_update(FromJID, PresenceEl, - StateData); + NewStateData); _ -> presence_track(FromJID, ToJID, PresenceEl, - StateData) + NewStateData) end; <<"iq">> -> case jlib:iq_query_info(NewEl) of @@ -1143,27 +1258,38 @@ session_established2(El, StateData) -> when Xmlns == (?NS_PRIVACY); Xmlns == (?NS_BLOCKING) -> process_privacy_iq(FromJID, ToJID, IQ, - StateData); + NewStateData); _ -> ejabberd_hooks:run(user_send_packet, Server, [FromJID, ToJID, NewEl]), - check_privacy_route(FromJID, StateData, + check_privacy_route(FromJID, NewStateData, FromJID, ToJID, NewEl), - StateData + NewStateData end; <<"message">> -> ejabberd_hooks:run(user_send_packet, Server, [FromJID, ToJID, NewEl]), - check_privacy_route(FromJID, StateData, FromJID, + check_privacy_route(FromJID, NewStateData, FromJID, ToJID, NewEl), - StateData; - _ -> StateData + NewStateData; + _ -> NewStateData end end, ejabberd_hooks:run(c2s_loop_debug, [{xmlstreamelement, El}]), fsm_next_state(session_established, NewState). +wait_for_resume({xmlstreamelement, _El} = Event, StateData) -> + session_established(Event, StateData), + fsm_next_state(wait_for_resume, StateData); +wait_for_resume(timeout, StateData) -> + ?DEBUG("Timed out waiting for resumption of stream for ~s", + [jlib:jid_to_string(StateData#state.jid)]), + {stop, normal, StateData}; +wait_for_resume(Event, StateData) -> + ?DEBUG("Ignoring event while waiting for resumption: ~p", [Event]), + fsm_next_state(wait_for_resume, StateData). + %%---------------------------------------------------------------------- %% Func: StateName/3 %% Returns: {next_state, NextStateName, NextStateData} | @@ -1208,6 +1334,15 @@ handle_sync_event(get_subscribed, _From, StateName, StateData) -> Subscribed = (?SETS):to_list(StateData#state.pres_f), {reply, Subscribed, StateName, StateData}; +handle_sync_event(resume_session, _From, _StateName, + StateData) -> + %% The old session should be closed before the new one is opened, so we do + %% this here instead of leaving it to the terminate callback + ejabberd_sm:close_session(StateData#state.sid, + StateData#state.user, + StateData#state.server, + StateData#state.resource), + {stop, normal, {ok, StateData}, StateData#state{mgmt_state = resumed}}; handle_sync_event(_Event, _From, StateName, StateData) -> Reply = ok, fsm_reply(Reply, StateName, StateData). @@ -1276,13 +1411,13 @@ handle_info({route, _From, _To, {broadcast, Data}}, jlib:jid_remove_resource(StateData#state.jid), StateData#state.jid, jlib:iq_to_xml(PrivPushIQ)), - send_element(StateData, PrivPushEl), + NewState = send_stanza(StateData, PrivPushEl), fsm_next_state(StateName, - StateData#state{privacy_list = NewPL}) + NewState#state{privacy_list = NewPL}) end; {blocking, What} -> - route_blocking(What, StateData), - fsm_next_state(StateName, StateData); + NewState = route_blocking(What, StateData), + fsm_next_state(StateName, NewState); _ -> fsm_next_state(StateName, StateData) end; @@ -1528,12 +1663,12 @@ handle_info({route, From, To, jlib:replace_from_to_attrs(jlib:jid_to_string(From), jlib:jid_to_string(To), NewAttrs), FixedPacket = #xmlel{name = Name, attrs = Attrs2, children = Els}, - send_element(StateData, FixedPacket), + SentStateData = send_stanza(StateData, FixedPacket), ejabberd_hooks:run(user_receive_packet, - StateData#state.server, - [StateData#state.jid, From, To, FixedPacket]), + SentStateData#state.server, + [SentStateData#state.jid, From, To, FixedPacket]), ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]), - fsm_next_state(StateName, NewState); + fsm_next_state(StateName, SentStateData); true -> ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]), fsm_next_state(StateName, NewState) @@ -1541,7 +1676,15 @@ handle_info({route, From, To, handle_info({'DOWN', Monitor, _Type, _Object, _Info}, _StateName, StateData) when Monitor == StateData#state.socket_monitor -> - {stop, normal, StateData}; + if StateData#state.mgmt_timeout > 0, + StateData#state.mgmt_state == active orelse + StateData#state.mgmt_state == pending -> + log_pending_state(StateData), + fsm_next_state(wait_for_resume, + StateData#state{mgmt_state = pending}); + true -> + {stop, normal, StateData} + end; handle_info(system_shutdown, StateName, StateData) -> case StateName of wait_for_stream -> @@ -1605,60 +1748,71 @@ print_state(State = #state{pres_t = T, pres_f = F, pres_a = A, pres_i = I}) -> %% Returns: any %%---------------------------------------------------------------------- terminate(_Reason, StateName, StateData) -> - case StateName of - session_established -> - case StateData#state.authenticated of - replaced -> - ?INFO_MSG("(~w) Replaced session for ~s", - [StateData#state.socket, - jlib:jid_to_string(StateData#state.jid)]), - From = StateData#state.jid, - Packet = #xmlel{name = <<"presence">>, - attrs = [{<<"type">>, <<"unavailable">>}], - children = - [#xmlel{name = <<"status">>, attrs = [], - children = - [{xmlcdata, - <<"Replaced by new connection">>}]}]}, - ejabberd_sm:close_session_unset_presence(StateData#state.sid, - StateData#state.user, - StateData#state.server, - StateData#state.resource, - <<"Replaced by new connection">>), - presence_broadcast(StateData, From, - StateData#state.pres_a, Packet), - presence_broadcast(StateData, From, - StateData#state.pres_i, Packet); - _ -> - ?INFO_MSG("(~w) Close session for ~s", - [StateData#state.socket, - jlib:jid_to_string(StateData#state.jid)]), - EmptySet = (?SETS):new(), - case StateData of - #state{pres_last = undefined, pres_a = EmptySet, pres_i = EmptySet, pres_invis = false} -> - ejabberd_sm:close_session(StateData#state.sid, - StateData#state.user, - StateData#state.server, - StateData#state.resource); - _ -> - From = StateData#state.jid, - Packet = #xmlel{name = <<"presence">>, - attrs = [{<<"type">>, <<"unavailable">>}], - children = []}, - ejabberd_sm:close_session_unset_presence(StateData#state.sid, - StateData#state.user, - StateData#state.server, - StateData#state.resource, - <<"">>), - presence_broadcast(StateData, From, - StateData#state.pres_a, Packet), - presence_broadcast(StateData, From, - StateData#state.pres_i, Packet) - end - end, - bounce_messages(); + case StateData#state.mgmt_state of + resumed -> + ?INFO_MSG("Closing former stream of resumed session for ~s", + [jlib:jid_to_string(StateData#state.jid)]); _ -> - ok + if StateName == session_established; + StateName == wait_for_resume -> + case StateData#state.authenticated of + replaced -> + ?INFO_MSG("(~w) Replaced session for ~s", + [StateData#state.socket, + jlib:jid_to_string(StateData#state.jid)]), + From = StateData#state.jid, + Packet = #xmlel{name = <<"presence">>, + attrs = [{<<"type">>, <<"unavailable">>}], + children = + [#xmlel{name = <<"status">>, attrs = [], + children = + [{xmlcdata, + <<"Replaced by new connection">>}]}]}, + ejabberd_sm:close_session_unset_presence(StateData#state.sid, + StateData#state.user, + StateData#state.server, + StateData#state.resource, + <<"Replaced by new connection">>), + presence_broadcast(StateData, From, + StateData#state.pres_a, Packet), + presence_broadcast(StateData, From, + StateData#state.pres_i, Packet), + handle_unacked_stanzas(StateData); + _ -> + ?INFO_MSG("(~w) Close session for ~s", + [StateData#state.socket, + jlib:jid_to_string(StateData#state.jid)]), + EmptySet = (?SETS):new(), + case StateData of + #state{pres_last = undefined, + pres_a = EmptySet, + pres_i = EmptySet, + pres_invis = false} -> + ejabberd_sm:close_session(StateData#state.sid, + StateData#state.user, + StateData#state.server, + StateData#state.resource); + _ -> + From = StateData#state.jid, + Packet = #xmlel{name = <<"presence">>, + attrs = [{<<"type">>, <<"unavailable">>}], + children = []}, + ejabberd_sm:close_session_unset_presence(StateData#state.sid, + StateData#state.user, + StateData#state.server, + StateData#state.resource, + <<"">>), + presence_broadcast(StateData, From, + StateData#state.pres_a, Packet), + presence_broadcast(StateData, From, + StateData#state.pres_i, Packet) + end, + handle_unacked_stanzas(StateData) + end, + bounce_messages(); + true -> + ok + end end, (StateData#state.sockmod):close(StateData#state.socket), ok. @@ -1673,6 +1827,8 @@ change_shaper(StateData, JID) -> (StateData#state.sockmod):change_shaper(StateData#state.socket, Shaper). +send_text(StateData, Text) when StateData#state.mgmt_state == pending -> + ?DEBUG("Cannot send text while waiting for resumption: ~p", [Text]); send_text(StateData, Text) when StateData#state.xml_socket -> ?DEBUG("Send Text on stream = ~p", [Text]), (StateData#state.sockmod):send_xml(StateData#state.socket, @@ -1681,12 +1837,23 @@ send_text(StateData, Text) -> ?DEBUG("Send XML on stream = ~p", [Text]), (StateData#state.sockmod):send(StateData#state.socket, Text). +send_element(StateData, El) when StateData#state.mgmt_state == pending -> + ?DEBUG("Cannot send element while waiting for resumption: ~p", [El]); send_element(StateData, El) when StateData#state.xml_socket -> (StateData#state.sockmod):send_xml(StateData#state.socket, {xmlstreamelement, El}); send_element(StateData, El) -> send_text(StateData, xml:element_to_binary(El)). +send_stanza(StateData, Stanza) when StateData#state.mgmt_state == pending -> + mgmt_queue_add(StateData, Stanza); +send_stanza(StateData, Stanza) when StateData#state.mgmt_state == active -> + send_stanza_and_ack_req(StateData, Stanza), + mgmt_queue_add(StateData, Stanza); +send_stanza(StateData, Stanza) -> + send_element(StateData, Stanza), + StateData. + send_header(StateData, Server, Version, Lang) when StateData#state.xml_socket -> VersionAttr = case Version of @@ -1722,6 +1889,9 @@ send_header(StateData, Server, Version, Lang) -> send_text(StateData, iolist_to_binary(Header)). send_trailer(StateData) + when StateData#state.mgmt_state == pending -> + ?DEBUG("Cannot send stream trailer while waiting for resumption", []); +send_trailer(StateData) when StateData#state.xml_socket -> (StateData#state.sockmod):send_xml(StateData#state.socket, {xmlstreamend, <<"stream:stream">>}); @@ -1740,6 +1910,19 @@ is_auth_packet(El) -> _ -> false end. +is_stanza(#xmlel{name = Name, attrs = Attrs}) when Name == <<"message">>; + Name == <<"presence">>; + Name == <<"iq">> -> + case xml:get_attr(<<"xmlns">>, Attrs) of + {value, NS} when NS /= <<"jabber:client">>, + NS /= <<"jabber:server">> -> + false; + _ -> + true + end; +is_stanza(_El) -> + false. + get_auth_tags([#xmlel{name = Name, children = Els} | L], U, P, D, R) -> CData = xml:get_cdata(Els), @@ -1867,12 +2050,12 @@ presence_update(From, Packet, StateData) -> ejabberd_hooks:run(user_available_hook, NewStateData#state.server, [NewStateData#state.jid]), - if NewPriority >= 0 -> - resend_offline_messages(NewStateData), - resend_subscription_requests(NewStateData); - true -> ok - end, - presence_broadcast_first(From, NewStateData, + ResentStateData = if NewPriority >= 0 -> + resend_offline_messages(NewStateData), + resend_subscription_requests(NewStateData); + true -> NewStateData + end, + presence_broadcast_first(From, ResentStateData, Packet); true -> presence_broadcast_to_trusted(NewStateData, From, @@ -2169,11 +2352,6 @@ resend_offline_messages(StateData) -> end, if Pass -> ejabberd_router:route(From, To, Packet); - %% send_element(StateData, FixedPacket), - %% ejabberd_hooks:run(user_receive_packet, - %% StateData#state.server, - %% [StateData#state.jid, - %% From, To, FixedPacket]); true -> ok end end, @@ -2186,10 +2364,11 @@ resend_subscription_requests(#state{user = User, PendingSubscriptions = ejabberd_hooks:run_fold(resend_subscription_requests_hook, Server, [], [User, Server]), - lists:foreach(fun (XMLPacket) -> - send_element(StateData, XMLPacket) - end, - PendingSubscriptions). + lists:foldl(fun (XMLPacket, AccStateData) -> + send_stanza(AccStateData, XMLPacket) + end, + StateData, + PendingSubscriptions). get_showtag(undefined) -> <<"unavailable">>; get_showtag(Presence) -> @@ -2268,6 +2447,15 @@ fsm_next_state_gc(StateName, PackedStateData) -> fsm_next_state(session_established, StateData) -> {next_state, session_established, StateData, ?C2S_HIBERNATE_TIMEOUT}; +fsm_next_state(wait_for_resume, #state{mgmt_pending_since = undefined} = + StateData) -> + {next_state, wait_for_resume, + StateData#state{mgmt_pending_since = os:timestamp()}, + StateData#state.mgmt_timeout}; +fsm_next_state(wait_for_resume, StateData) -> + Diff = timer:now_diff(os:timestamp(), StateData#state.mgmt_pending_since), + Timeout = max(StateData#state.mgmt_timeout - Diff div 1000, 1), + {next_state, wait_for_resume, StateData, Timeout}; fsm_next_state(StateName, StateData) -> {next_state, StateName, StateData, ?C2S_OPEN_TIMEOUT}. @@ -2276,6 +2464,15 @@ fsm_next_state(StateName, StateData) -> fsm_reply(Reply, session_established, StateData) -> {reply, Reply, session_established, StateData, ?C2S_HIBERNATE_TIMEOUT}; +fsm_reply(Reply, wait_for_resume, #state{mgmt_pending_since = undefined} = + StateData) -> + {reply, Reply, wait_for_resume, + StateData#state{mgmt_pending_since = os:timestamp()}, + StateData#state.mgmt_timeout}; +fsm_reply(Reply, wait_for_resume, StateData) -> + Diff = timer:now_diff(os:timestamp(), StateData#state.mgmt_pending_since), + Timeout = max(StateData#state.mgmt_timeout - Diff div 1000, 1), + {reply, Reply, wait_for_resume, StateData, Timeout}; fsm_reply(Reply, StateName, StateData) -> {reply, Reply, StateName, StateData, ?C2S_OPEN_TIMEOUT}. @@ -2365,12 +2562,399 @@ route_blocking(What, StateData) -> PrivPushEl = jlib:replace_from_to(jlib:jid_remove_resource(StateData#state.jid), StateData#state.jid, jlib:iq_to_xml(PrivPushIQ)), - send_element(StateData, PrivPushEl), %% No need to replace active privacy list here, %% blocking pushes are always accompanied by %% Privacy List pushes + send_stanza(StateData, PrivPushEl). + +%%%---------------------------------------------------------------------- +%%% XEP-0198 +%%%---------------------------------------------------------------------- + +stream_mgmt_enabled(#state{mgmt_state = disabled}) -> + false; +stream_mgmt_enabled(_StateData) -> + true. + +dispatch_stream_mgmt(El, StateData) + when StateData#state.mgmt_state == active; + StateData#state.mgmt_state == pending -> + perform_stream_mgmt(El, StateData); +dispatch_stream_mgmt(El, StateData) -> + negotiate_stream_mgmt(El, StateData). + +negotiate_stream_mgmt(_El, #state{resource = <<"">>} = StateData) -> + %% XEP-0198 says: "For client-to-server connections, the client MUST NOT + %% attempt to enable stream management until after it has completed Resource + %% Binding unless it is resuming a previous session". However, it also + %% says: "Stream management errors SHOULD be considered recoverable", so we + %% won't bail out. + send_element(StateData, ?MGMT_UNEXPECTED_REQUEST(?NS_STREAM_MGMT_3)), + StateData; +negotiate_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) -> + case xml:get_attr_s(<<"xmlns">>, Attrs) of + Xmlns when ?IS_SUPPORTED_MGMT_XMLNS(Xmlns) -> + case stream_mgmt_enabled(StateData) of + true -> + case Name of + <<"enable">> -> + handle_enable(StateData#state{mgmt_xmlns = Xmlns}, Attrs); + _ -> + Res = if Name == <<"a">>; + Name == <<"r">>; + Name == <<"resume">> -> + ?MGMT_UNEXPECTED_REQUEST(Xmlns); + true -> + ?MGMT_BAD_REQUEST(Xmlns) + end, + send_element(StateData, Res), + StateData + end; + false -> + send_element(StateData, ?MGMT_SERVICE_UNAVAILABLE(Xmlns)), + StateData + end; + _ -> + send_element(StateData, ?MGMT_UNSUPPORTED_VERSION(?NS_STREAM_MGMT_3)), + StateData + end. + +perform_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) -> + case xml:get_attr_s(<<"xmlns">>, Attrs) of + Xmlns when Xmlns == StateData#state.mgmt_xmlns -> + case Name of + <<"r">> -> + handle_r(StateData); + <<"a">> -> + handle_a(StateData, Attrs); + _ -> + Res = if Name == <<"enable">>; + Name == <<"resume">> -> + ?MGMT_UNEXPECTED_REQUEST(Xmlns); + true -> + ?MGMT_BAD_REQUEST(Xmlns) + end, + send_element(StateData, Res), + StateData + end; + _ -> + send_element(StateData, + ?MGMT_UNSUPPORTED_VERSION(StateData#state.mgmt_xmlns)), + StateData + end. + +handle_enable(#state{mgmt_timeout = ConfigTimeout} = StateData, Attrs) -> + Timeout = case xml:get_attr_s(<<"resume">>, Attrs) of + ResumeAttr when ResumeAttr == <<"true">>; + ResumeAttr == <<"1">> -> + MaxAttr = xml:get_attr_s(<<"max">>, Attrs), + case catch jlib:binary_to_integer(MaxAttr) of + Max when is_integer(Max), Max > 0, Max =< ConfigTimeout -> + Max; + _ -> + ConfigTimeout + end; + _ -> + 0 + end, + ResAttrs = [{<<"xmlns">>, StateData#state.mgmt_xmlns}] ++ + if Timeout > 0 -> + ?INFO_MSG("Stream management with resumption enabled for ~s", + [jlib:jid_to_string(StateData#state.jid)]), + [{<<"id">>, make_resume_id(StateData)}, + {<<"resume">>, <<"true">>}, + {<<"max">>, jlib:integer_to_binary(Timeout)}]; + true -> + ?INFO_MSG("Stream management without resumption enabled for ~s", + [jlib:jid_to_string(StateData#state.jid)]), + [] + end, + Res = #xmlel{name = <<"enabled">>, + attrs = ResAttrs, + children = []}, + send_element(StateData, Res), + StateData#state{mgmt_state = active, + mgmt_queue = queue:new(), + mgmt_timeout = Timeout * 1000}. + +handle_r(StateData) -> + H = jlib:integer_to_binary(StateData#state.mgmt_stanzas_in), + Res = #xmlel{name = <<"a">>, + attrs = [{<<"xmlns">>, StateData#state.mgmt_xmlns}, + {<<"h">>, H}], + children = []}, + send_element(StateData, Res), + StateData. + +handle_a(#state{jid = JID, mgmt_stanzas_out = NumStanzasOut} = StateData, + Attrs) -> + case catch jlib:binary_to_integer(xml:get_attr_s(<<"h">>, Attrs)) of + H when is_integer(H), H >= 0 -> + ?DEBUG("~s acknowledged ~B of ~B stanzas", + [jlib:jid_to_string(JID), H, NumStanzasOut]), + mgmt_queue_drop(StateData, H); + _ -> + ?WARNING_MSG("Ignoring invalid ACK element from ~s", + [jlib:jid_to_string(JID)]), + StateData + end. + +handle_resume(StateData, Attrs) -> + R = case xml:get_attr_s(<<"xmlns">>, Attrs) of + Xmlns when ?IS_SUPPORTED_MGMT_XMLNS(Xmlns) -> + case stream_mgmt_enabled(StateData) of + true -> + case {xml:get_attr(<<"previd">>, Attrs), + catch jlib:binary_to_integer(xml:get_attr_s(<<"h">>, Attrs))} + of + {{value, PrevID}, H} when is_integer(H) -> + case inherit_session_state(StateData, PrevID) of + {ok, InheritedState} -> + {ok, InheritedState, H}; + {error, Err} -> + {error, ?MGMT_ITEM_NOT_FOUND(Xmlns), Err} + end; + _ -> + {error, ?MGMT_BAD_REQUEST(Xmlns), + <<"Invalid request">>} + end; + false -> + {error, ?MGMT_SERVICE_UNAVAILABLE(Xmlns), + <<"XEP-0198 disabled">>} + end; + _ -> + {error, ?MGMT_UNSUPPORTED_VERSION(?NS_STREAM_MGMT_3), + <<"Invalid XMLNS">>} + end, + case R of + {ok, ResumedState, NumHandled} -> + NewState = mgmt_queue_drop(ResumedState, NumHandled), + AttrXmlns = NewState#state.mgmt_xmlns, + AttrId = make_resume_id(NewState), + AttrH = jlib:integer_to_binary(NewState#state.mgmt_stanzas_in), + send_element(NewState, + #xmlel{name = <<"resumed">>, + attrs = [{<<"xmlns">>, AttrXmlns}, + {<<"h">>, AttrH}, + {<<"previd">>, AttrId}], + children = []}), + SendFun = fun(_F, _T, El) -> send_element(NewState, El) end, + handle_unacked_stanzas(NewState, SendFun), + send_element(NewState, + #xmlel{name = <<"r">>, + attrs = [{<<"xmlns">>, AttrXmlns}], + children = []}), + ?INFO_MSG("Resumed session for ~s", + [jlib:jid_to_string(NewState#state.jid)]), + {ok, NewState}; + {error, El, Msg} -> + send_element(StateData, El), + ?INFO_MSG("Cannot resume session for ~s@~s: ~s", + [StateData#state.user, StateData#state.server, Msg]), + error + end. + +update_num_stanzas_in(#state{mgmt_state = active} = StateData, El) -> + NewNum = case {is_stanza(El), StateData#state.mgmt_stanzas_in} of + {true, 4294967295} -> + 0; + {true, Num} -> + Num + 1; + {false, Num} -> + Num + end, + StateData#state{mgmt_stanzas_in = NewNum}; +update_num_stanzas_in(StateData, _El) -> + StateData. + +send_stanza_and_ack_req(StateData, Stanza) -> + AckReq = #xmlel{name = <<"r">>, + attrs = [{<<"xmlns">>, StateData#state.mgmt_xmlns}], + children = []}, + StanzaS = xml:element_to_binary(Stanza), + AckReqS = xml:element_to_binary(AckReq), + send_text(StateData, [StanzaS, AckReqS]). + +mgmt_queue_add(StateData, El) -> + NewNum = case StateData#state.mgmt_stanzas_out of + 4294967295 -> + 0; + Num -> + Num + 1 + end, + NewState = limit_queue_length(StateData), + NewQueue = queue:in({NewNum, El}, NewState#state.mgmt_queue), + NewState#state{mgmt_queue = NewQueue, mgmt_stanzas_out = NewNum}. + +mgmt_queue_drop(StateData, NumHandled) -> + NewQueue = jlib:queue_drop_while(fun({N, _Stanza}) -> N =< NumHandled end, + StateData#state.mgmt_queue), + StateData#state{mgmt_queue = NewQueue}. + +limit_queue_length(#state{mgmt_max_queue = Limit} = StateData) + when Limit == infinity; + Limit == unlimited -> + StateData; +limit_queue_length(#state{jid = JID, + mgmt_queue = Queue, + mgmt_max_queue = Limit} = StateData) -> + case queue:len(Queue) >= Limit of + true -> + ?WARNING_MSG("Dropping stanza from too long ACK queue for ~s", + [jlib:jid_to_string(JID)]), + limit_queue_length(StateData#state{mgmt_queue = queue:drop(Queue)}); + false -> + StateData + end. + +log_pending_state(StateData) when StateData#state.mgmt_state /= pending -> + ?INFO_MSG("Waiting for resumption of stream for ~s", + [jlib:jid_to_string(StateData#state.jid)]); +log_pending_state(_StateData) -> + ok. + +handle_unacked_stanzas(StateData, F) + when StateData#state.mgmt_state == active; + StateData#state.mgmt_state == pending -> + Queue = StateData#state.mgmt_queue, + case queue:len(Queue) of + 0 -> + ok; + N -> + ?INFO_MSG("~B stanzas were not acknowledged by ~s", + [N, jlib:jid_to_string(StateData#state.jid)]), + lists:foreach( + fun({_, #xmlel{attrs = Attrs} = El}) -> + From_s = xml:get_attr_s(<<"from">>, Attrs), + From = jlib:string_to_jid(From_s), + To_s = xml:get_attr_s(<<"to">>, Attrs), + To = jlib:string_to_jid(To_s), + F(From, To, El) + end, queue:to_list(Queue)) + end; +handle_unacked_stanzas(_StateData, _F) -> + ok. + +handle_unacked_stanzas(StateData) + when StateData#state.mgmt_state == active; + StateData#state.mgmt_state == pending -> + ReRoute = case StateData#state.mgmt_resend of + true -> + fun ejabberd_router:route/3; + false -> + fun(From, To, El) -> + Err = + jlib:make_error_reply(El, + ?ERR_SERVICE_UNAVAILABLE), + ejabberd_router:route(To, From, Err) + end + end, + F = fun(From, To, El) -> + %% We'll drop the stanza if it was <forwarded/> by some + %% encapsulating protocol as per XEP-0297. One such protocol is + %% XEP-0280, which says: "When a receiving server attempts to + %% deliver a forked message, and that message bounces with an + %% error for any reason, the receiving server MUST NOT forward + %% that error back to the original sender." Resending such a + %% stanza could easily lead to unexpected results as well. + case is_encapsulated_forward(El) of + true -> + ?DEBUG("Dropping forwarded stanza from ~s", + [xml:get_attr_s(<<"from">>, El#xmlel.attrs)]); + false -> + ReRoute(From, To, El) + end + end, + handle_unacked_stanzas(StateData, F); +handle_unacked_stanzas(_StateData) -> ok. +is_encapsulated_forward(#xmlel{name = <<"message">>} = El) -> + SubTag = case {xml:get_subtag(El, <<"sent">>), + xml:get_subtag(El, <<"received">>), + xml:get_subtag(El, <<"result">>)} of + {false, false, false} -> + false; + {Tag, false, false} -> + Tag; + {false, Tag, false} -> + Tag; + {_, _, Tag} -> + Tag + end, + if SubTag == false -> + false; + true -> + case xml:get_subtag(SubTag, <<"forwarded">>) of + false -> + false; + _ -> + true + end + end; +is_encapsulated_forward(_El) -> + false. + +inherit_session_state(#state{user = U, server = S} = StateData, ResumeID) -> + case jlib:base64_to_term(ResumeID) of + {term, {U, S, R, Time}} -> + case ejabberd_sm:get_session_pid(U, S, R) of + none -> + {error, <<"Previous session PID not found">>}; + OldPID -> + OldSID = {Time, OldPID}, + case catch resume_session(OldPID) of + {ok, #state{sid = OldSID} = OldStateData} -> + NewSID = {Time, self()}, % Old time, new PID + Priority = case OldStateData#state.pres_last of + undefined -> + 0; + Presence -> + get_priority_from_presence(Presence) + end, + Conn = get_conn_type(StateData), + Info = [{ip, StateData#state.ip}, {conn, Conn}, + {auth_module, StateData#state.auth_module}], + ejabberd_sm:open_session(NewSID, U, S, R, + Priority, Info), + {ok, StateData#state{sid = NewSID, + jid = OldStateData#state.jid, + resource = OldStateData#state.resource, + pres_t = OldStateData#state.pres_t, + pres_f = OldStateData#state.pres_f, + pres_a = OldStateData#state.pres_a, + pres_i = OldStateData#state.pres_i, + pres_last = OldStateData#state.pres_last, + pres_pri = OldStateData#state.pres_pri, + pres_timestamp = OldStateData#state.pres_timestamp, + pres_invis = OldStateData#state.pres_invis, + privacy_list = OldStateData#state.privacy_list, + aux_fields = OldStateData#state.aux_fields, + mgmt_xmlns = OldStateData#state.mgmt_xmlns, + mgmt_queue = OldStateData#state.mgmt_queue, + mgmt_timeout = OldStateData#state.mgmt_timeout, + mgmt_stanzas_in = OldStateData#state.mgmt_stanzas_in, + mgmt_stanzas_out = OldStateData#state.mgmt_stanzas_out, + mgmt_state = active}}; + _ -> + {error, <<"Cannot grab session state">>} + end + end; + error -> + {error, <<"Invalid 'previd' value">>} + end. + +resume_session(FsmRef) -> + (?GEN_FSM):sync_send_all_state_event(FsmRef, resume_session, 3000). + +make_resume_id(StateData) -> + {Time, _} = StateData#state.sid, + ID = {StateData#state.user, + StateData#state.server, + StateData#state.resource, + Time}, + jlib:term_to_base64(ID). + %%%---------------------------------------------------------------------- %%% JID Set memory footprint reduction code %%%---------------------------------------------------------------------- diff --git a/src/ejabberd_listener.erl b/src/ejabberd_listener.erl index 2051afdb2..844080a04 100644 --- a/src/ejabberd_listener.erl +++ b/src/ejabberd_listener.erl @@ -151,7 +151,20 @@ init_udp(PortIP, Module, Opts, SockOpts, Port, IPS) -> {ok, Socket} -> %% Inform my parent that this port was opened succesfully proc_lib:init_ack({ok, self()}), - udp_recv(Socket, Module, Opts); + case erlang:function_exported(Module, udp_init, 2) of + false -> + udp_recv(Socket, Module, Opts); + true -> + case catch Module:udp_init(Socket, Opts) of + {'EXIT', _} = Err -> + ?ERROR_MSG("failed to process callback function " + "~p:~s(~p, ~p): ~p", + [Module, udp_init, Socket, Opts, Err]), + udp_recv(Socket, Module, Opts); + NewOpts -> + udp_recv(Socket, Module, NewOpts) + end + end; {error, Reason} -> socket_error(Reason, PortIP, Module, SockOpts, Port, IPS) end. @@ -160,8 +173,20 @@ init_tcp(PortIP, Module, Opts, SockOpts, Port, IPS) -> ListenSocket = listen_tcp(PortIP, Module, SockOpts, Port, IPS), %% Inform my parent that this port was opened succesfully proc_lib:init_ack({ok, self()}), - %% And now start accepting connection attempts - accept(ListenSocket, Module, Opts). + case erlang:function_exported(Module, tcp_init, 2) of + false -> + accept(ListenSocket, Module, Opts); + true -> + case catch Module:tcp_init(ListenSocket, Opts) of + {'EXIT', _} = Err -> + ?ERROR_MSG("failed to process callback function " + "~p:~s(~p, ~p): ~p", + [Module, tcp_init, ListenSocket, Opts, Err]), + accept(ListenSocket, Module, Opts); + NewOpts -> + accept(ListenSocket, Module, NewOpts) + end + end. listen_tcp(PortIP, Module, SockOpts, Port, IPS) -> case ets:lookup(listen_sockets, PortIP) of @@ -311,11 +336,11 @@ udp_recv(Socket, Module, Opts) -> ?ERROR_MSG("failed to process UDP packet:~n" "** Source: {~p, ~p}~n" "** Reason: ~p~n** Packet: ~p", - [Addr, Port, Reason, Packet]); - _ -> - ok - end, - udp_recv(Socket, Module, Opts); + [Addr, Port, Reason, Packet]), + udp_recv(Socket, Module, Opts); + NewOpts -> + udp_recv(Socket, Module, NewOpts) + end; {error, Reason} -> ?ERROR_MSG("unexpected UDP error: ~s", [format_error(Reason)]), throw({error, Reason}) @@ -342,6 +367,7 @@ start_listener2(Port, Module, Opts) -> %% But it doesn't hurt to attempt to start it for any listener. %% So, it's normal (and harmless) that in most cases this call returns: {error, {already_started, pid()}} maybe_start_stun(Module), + maybe_start_sip(Module), start_module_sup(Port, Module), start_listener_sup(Port, Module, Opts). @@ -463,6 +489,11 @@ maybe_start_stun(ejabberd_stun) -> maybe_start_stun(_) -> ok. +maybe_start_sip(esip_socket) -> + ejabberd:start_app(esip); +maybe_start_sip(_) -> + ok. + %%% %%% Check options %%% @@ -642,7 +673,11 @@ prepare_ip(IP) when is_binary(IP) -> prepare_mod(ejabberd_stun) -> prepare_mod(stun); +prepare_mod(ejabberd_sip) -> + prepare_mod(sip); prepare_mod(stun) -> stun; +prepare_mod(sip) -> + esip_socket; prepare_mod(Mod) when is_atom(Mod) -> Mod. diff --git a/src/ejabberd_router.erl b/src/ejabberd_router.erl index caf444fba..70a01ee4e 100644 --- a/src/ejabberd_router.erl +++ b/src/ejabberd_router.erl @@ -396,6 +396,7 @@ update_tables() -> [domain, node, pid] -> mnesia:delete_table(route); [domain, pid] -> mnesia:delete_table(route); [domain, pid, local_hint] -> ok; + [domain, pid, local_hint|_] -> mnesia:delete_table(route); {'EXIT', _} -> ok end, case lists:member(local_route, diff --git a/src/ejabberd_sm.erl b/src/ejabberd_sm.erl index 58debf0c1..094918cd9 100644 --- a/src/ejabberd_sm.erl +++ b/src/ejabberd_sm.erl @@ -32,7 +32,9 @@ %% API -export([start_link/0, route/3, - open_session/5, close_session/4, + open_session/5, + open_session/6, + close_session/4, check_in_subscription/6, bounce_offline_message/3, disconnect_removed_user/2, @@ -56,6 +58,7 @@ get_session_pid/3, get_user_info/3, get_user_ip/3, + get_max_user_sessions/2, is_existing_resource/3 ]). @@ -108,10 +111,10 @@ route(From, To, Packet) -> _ -> ok end. --spec open_session(sid(), binary(), binary(), binary(), info()) -> ok. +-spec open_session(sid(), binary(), binary(), binary(), prio(), info()) -> ok. -open_session(SID, User, Server, Resource, Info) -> - set_session(SID, User, Server, Resource, undefined, Info), +open_session(SID, User, Server, Resource, Priority, Info) -> + set_session(SID, User, Server, Resource, Priority, Info), mnesia:dirty_update_counter(session_counter, jlib:nameprep(Server), 1), check_for_sessions_to_replace(User, Server, Resource), @@ -119,6 +122,11 @@ open_session(SID, User, Server, Resource, Info) -> ejabberd_hooks:run(sm_register_connection_hook, JID#jid.lserver, [SID, JID, Info]). +-spec open_session(sid(), binary(), binary(), binary(), info()) -> ok. + +open_session(SID, User, Server, Resource, Info) -> + open_session(SID, User, Server, Resource, undefined, Info). + -spec close_session(sid(), binary(), binary(), binary()) -> ok. close_session(SID, User, Server, Resource) -> @@ -842,6 +850,7 @@ update_tables() -> [ur, user, node] -> mnesia:delete_table(session); [ur, user, pid] -> mnesia:delete_table(session); [usr, us, pid] -> mnesia:delete_table(session); + [usr, us, sid, priority, info] -> mnesia:delete_table(session); [sid, usr, us, priority] -> mnesia:delete_table(session); [sid, usr, us, priority, info] -> ok; diff --git a/src/ejabberd_system_monitor.erl b/src/ejabberd_system_monitor.erl index 011c02c40..368c5a0ff 100644 --- a/src/ejabberd_system_monitor.erl +++ b/src/ejabberd_system_monitor.erl @@ -244,9 +244,8 @@ s2s_out_info(Pid) -> [<<"Process type: s2s_out">>, case FromTo of [{From, To}] -> - <<"\n", - (io_lib:format("S2S connection: from ~s to ~s", - [From, To]))/binary>>; + list_to_binary(io_lib:format("\nS2S connection: from ~s to ~s", + [From, To])); _ -> <<"">> end, check_send_queue(Pid), <<"\n">>, diff --git a/src/jlib.erl b/src/jlib.erl index 0ff210652..7735d7dbc 100644 --- a/src/jlib.erl +++ b/src/jlib.erl @@ -45,12 +45,13 @@ timestamp_to_iso/2, timestamp_to_xml/4, timestamp_to_xml/1, now_to_utc_string/1, now_to_local_string/1, datetime_string_to_timestamp/1, + term_to_base64/1, base64_to_term/1, decode_base64/1, encode_base64/1, ip_to_list/1, rsm_encode/1, rsm_encode/2, rsm_decode/1, binary_to_integer/1, binary_to_integer/2, integer_to_binary/1, integer_to_binary/2, atom_to_binary/1, binary_to_atom/1, tuple_to_binary/1, - l2i/1, i2l/1, i2l/2]). + l2i/1, i2l/1, i2l/2, queue_drop_while/2]). %% TODO: Remove once XEP-0091 is Obsolete %% TODO: Remove once XEP-0091 is Obsolete @@ -779,6 +780,21 @@ check_list(List) -> % Base64 stuff (based on httpd_util.erl) % +-spec term_to_base64(term()) -> binary(). + +term_to_base64(Term) -> + encode_base64(term_to_binary(Term)). + +-spec base64_to_term(binary()) -> {term, term()} | error. + +base64_to_term(Base64) -> + case catch binary_to_term(decode_base64(Base64), [safe]) of + {'EXIT', _} -> + error; + Term -> + {term, Term} + end. + -spec decode_base64(binary()) -> binary(). decode_base64(S) -> @@ -893,3 +909,18 @@ i2l(L, N) when is_binary(L) -> C when C > N -> L; _ -> i2l(<<$0, L/binary>>, N) end. + +-spec queue_drop_while(fun((term()) -> boolean()), queue()) -> queue(). + +queue_drop_while(F, Q) -> + case queue:peek(Q) of + {value, Item} -> + case F(Item) of + true -> + queue_drop_while(F, queue:drop(Q)); + _ -> + Q + end; + empty -> + Q + end. diff --git a/src/mod_muc_room.erl b/src/mod_muc_room.erl index 461ab1da2..3842fde40 100644 --- a/src/mod_muc_room.erl +++ b/src/mod_muc_room.erl @@ -245,7 +245,7 @@ normal_state({route, From, <<"">>, NewState = expulse_participant(Packet, From, StateData, translate:translate(Lang, ErrorText)), - {next_state, normal_state, NewState}; + close_room_if_temporary_and_empty(NewState); _ -> {next_state, normal_state, StateData} end; <<"chat">> -> @@ -418,12 +418,13 @@ normal_state({route, From, <<"">>, StateData) -> case jlib:iq_query_info(Packet) of #iq{type = Type, xmlns = XMLNS, lang = Lang, - sub_el = SubEl} = + sub_el = #xmlel{name = SubElName} = SubEl} = IQ when (XMLNS == (?NS_MUC_ADMIN)) or (XMLNS == (?NS_MUC_OWNER)) or (XMLNS == (?NS_DISCO_INFO)) or (XMLNS == (?NS_DISCO_ITEMS)) + or (XMLNS == (?NS_VCARD)) or (XMLNS == (?NS_CAPTCHA)) -> Res1 = case XMLNS of ?NS_MUC_ADMIN -> @@ -434,6 +435,8 @@ normal_state({route, From, <<"">>, process_iq_disco_info(From, Type, Lang, StateData); ?NS_DISCO_ITEMS -> process_iq_disco_items(From, Type, Lang, StateData); + ?NS_VCARD -> + process_iq_vcard(From, Type, Lang, SubEl, StateData); ?NS_CAPTCHA -> process_iq_captcha(From, Type, Lang, SubEl, StateData) end, @@ -441,7 +444,7 @@ normal_state({route, From, <<"">>, {result, Res, SD} -> {IQ#iq{type = result, sub_el = - [#xmlel{name = <<"query">>, + [#xmlel{name = SubElName, attrs = [{<<"xmlns">>, XMLNS}], @@ -1123,14 +1126,17 @@ process_presence(From, Nick, end; _ -> StateData end, + close_room_if_temporary_and_empty(StateData1). + +close_room_if_temporary_and_empty(StateData1) -> case not (StateData1#state.config)#config.persistent andalso (?DICT):to_list(StateData1#state.users) == [] of true -> ?INFO_MSG("Destroyed MUC room ~s because it's temporary " "and empty", - [jlib:jid_to_string(StateData#state.jid)]), - add_to_log(room_existence, destroyed, StateData), + [jlib:jid_to_string(StateData1#state.jid)]), + add_to_log(room_existence, destroyed, StateData1), {stop, normal, StateData1}; _ -> {next_state, normal_state, StateData1} end. @@ -3894,6 +3900,10 @@ set_opts([{Opt, Val} | Opts], StateData) -> StateData#state{config = (StateData#state.config)#config{max_users = MaxUsers}}; + vcard -> + StateData#state{config = + (StateData#state.config)#config{vcard = + Val}}; affiliations -> StateData#state{affiliations = (?DICT):from_list(Val)}; subject -> StateData#state{subject = Val}; @@ -3926,6 +3936,7 @@ make_opts(StateData) -> ?MAKE_CONFIG_OPT(logging), ?MAKE_CONFIG_OPT(max_users), ?MAKE_CONFIG_OPT(allow_voice_requests), ?MAKE_CONFIG_OPT(voice_request_min_interval), + ?MAKE_CONFIG_OPT(vcard), {captcha_whitelist, (?SETS):to_list((StateData#state.config)#config.captcha_whitelist)}, {affiliations, @@ -3992,6 +4003,8 @@ process_iq_disco_info(_From, get, Lang, StateData) -> {<<"name">>, get_title(StateData)}], children = []}, #xmlel{name = <<"feature">>, + attrs = [{<<"var">>, ?NS_VCARD}], children = []}, + #xmlel{name = <<"feature">>, attrs = [{<<"var">>, ?NS_MUC}], children = []}, ?CONFIG_OPT_TO_FEATURE((Config#config.public), <<"muc_public">>, <<"muc_hidden">>), @@ -4064,6 +4077,26 @@ process_iq_captcha(_From, set, _Lang, SubEl, _ -> {error, ?ERR_NOT_ACCEPTABLE} end. +process_iq_vcard(_From, get, _Lang, _SubEl, StateData) -> + #state{config = #config{vcard = VCardRaw}} = StateData, + case xml_stream:parse_element(VCardRaw) of + #xmlel{children = VCardEls} -> + {result, VCardEls, StateData}; + {error, _} -> + {result, [], StateData} + end; +process_iq_vcard(From, set, Lang, SubEl, StateData) -> + case get_affiliation(From, StateData) of + owner -> + VCardRaw = xml:element_to_binary(SubEl), + Config = StateData#state.config, + NewConfig = Config#config{vcard = VCardRaw}, + change_config(NewConfig, StateData); + _ -> + ErrText = <<"Owner privileges required">>, + {error, ?ERRT_FORBIDDEN(Lang, ErrText)} + end. + get_title(StateData) -> case (StateData#state.config)#config.title of <<"">> -> StateData#state.room; diff --git a/src/mod_offline.erl b/src/mod_offline.erl index 77d333bd7..fca227d31 100644 --- a/src/mod_offline.erl +++ b/src/mod_offline.erl @@ -870,7 +870,7 @@ count_offline_messages(LUser, LServer) -> case catch odbc_queries:count_records_where( LServer, "spool", <<"where username='", Username/binary, "'">>) of - {selected, [_], [{Res}]} -> + {selected, [_], [[Res]]} -> jlib:binary_to_integer(Res); _ -> 0 diff --git a/src/mod_pubsub.erl b/src/mod_pubsub.erl index 87d49cb54..168169a95 100644 --- a/src/mod_pubsub.erl +++ b/src/mod_pubsub.erl @@ -827,7 +827,7 @@ send_loop(State) -> end; (_) -> ok end, - Subscriptions) + lists:usort(Subscriptions)) end, State#state.plugins), if not State#state.ignore_pep_from_offline -> @@ -1166,22 +1166,21 @@ disco_items(Host, Node, From) -> %% presence hooks handling functions %% -caps_update(#jid{luser = U, lserver = S, lresource = R} = From, To, _Features) -> - Pid = ejabberd_sm:get_session_pid(U, S, R), - presence_probe(From, To, Pid). - -presence_probe(#jid{luser = User, lserver = Server, lresource = Resource} = JID, - JID, Pid) -> - presence(Server, {presence, JID, Pid}), - presence(Server, {presence, User, Server, [Resource], JID}); -presence_probe(#jid{luser = User, lserver = Server}, - #jid{luser = User, lserver = Server}, _Pid) -> - %% ignore presence_probe from other ressources for the current user - %% this way, we do not send duplicated last items if user already connected with other clients +caps_update(#jid{luser = U, lserver = S, lresource = R}, #jid{lserver = Host} = JID, _Features) + when Host =/= S -> + presence(Host, {presence, U, S, [R], JID}); +caps_update(_From, _To, _Feature) -> + ok. + +presence_probe(#jid{luser = U, lserver = S, lresource = R} = JID, JID, Pid) -> + presence(S, {presence, JID, Pid}), + presence(S, {presence, U, S, [R], JID}); +presence_probe(#jid{luser = U, lserver = S}, #jid{luser = U, lserver = S}, _Pid) -> + %% ignore presence_probe from my other ressources + %% to not get duplicated last items ok; -presence_probe(#jid{luser = User, lserver = Server, lresource = Resource}, - #jid{lserver = Host} = JID, _Pid) -> - presence(Host, {presence, User, Server, [Resource], JID}). +presence_probe(#jid{luser = U, lserver = S, lresource = R}, #jid{lserver = Host} = JID, _Pid) -> + presence(Host, {presence, U, S, [R], JID}). presence(ServerHost, Presence) -> SendLoop = case @@ -1621,7 +1620,7 @@ command_disco_info(_Host, ?NS_PUBSUB_GET_PENDING, node_disco_info(Host, Node, From) -> node_disco_info(Host, Node, From, true, true). -node_disco_info(Host, Node, From, Identity, Features) -> +node_disco_info(Host, Node, From, _Identity, _Features) -> % Action = % fun(#pubsub_node{type = Type, id = NodeId}) -> % I = case Identity of @@ -2952,10 +2951,7 @@ publish_item(Host, ServerHost, Node, Publisher, ItemId, Payload, Access) -> PublishModel = get_option(Options, publish_model), DeliverPayloads = get_option(Options, deliver_payloads), PersistItems = get_option(Options, persist_items), - MaxItems = case PersistItems of - false -> 0; - true -> max_items(Host, Options) - end, + MaxItems = max_items(Host, Options), PayloadCount = payload_xmlelements(Payload), PayloadSize = byte_size(term_to_binary(Payload)) - 2, PayloadMaxSize = get_option(Options, max_payload_size), @@ -3359,6 +3355,8 @@ send_items(Host, Node, NodeId, Type, {U, S, R} = LJID, _ -> [] end, Stanza = case ToSend of + [] -> + undefined; [LastItem] -> {ModifNow, ModifUSR} = LastItem#pubsub_item.modification, @@ -3372,11 +3370,13 @@ send_items(Host, Node, NodeId, Type, {U, S, R} = LJID, attrs = nodeAttr(Node), children = itemsEls(ToSend)}]) end, - case is_tuple(Host) of - false -> + case {is_tuple(Host), Stanza} of + {_, undefined} -> + ok; + {false, _} -> ejabberd_router:route(service_jid(Host), jlib:make_jid(LJID), Stanza); - true -> + {true, _} -> case ejabberd_sm:get_session_pid(U, S, R) of C2SPid when is_pid(C2SPid) -> ejabberd_c2s:broadcast(C2SPid, @@ -4156,7 +4156,7 @@ presence_can_deliver({User, Server, Resource}, true) -> ({session, _, _ , _, undefined, _}, _Acc) -> false; ({session, _, {_, _, R}, _, _Priority, _}, _Acc) -> case Resource of - [] -> true; + <<>> -> true; R -> true; _ -> false end @@ -4442,15 +4442,7 @@ broadcast_stanza(Host, _Node, _NodeId, _Type, NodeOptions, SubsByDepth, NotifyTy broadcast_stanza({LUser, LServer, LResource}, Publisher, Node, NodeId, Type, NodeOptions, SubsByDepth, NotifyType, BaseStanza, SHIM) -> broadcast_stanza({LUser, LServer, LResource}, Node, NodeId, Type, NodeOptions, SubsByDepth, NotifyType, BaseStanza, SHIM), %% Handles implicit presence subscriptions - SenderResource = case LResource of - [] -> - case user_resources(LUser, LServer) of - [Resource|_] -> Resource; - _ -> <<>> - end; - _ -> - LResource - end, + SenderResource = user_resource(LUser, LServer, LResource), case ejabberd_sm:get_session_pid(LUser, LServer, SenderResource) of C2SPid when is_pid(C2SPid) -> Stanza = case get_option(NodeOptions, notification_type, headline) of @@ -4461,8 +4453,8 @@ broadcast_stanza({LUser, LServer, LResource}, Publisher, Node, NodeId, Type, Nod %% Also, add "replyto" if entity has presence subscription to the account owner %% See XEP-0163 1.1 section 4.3.1 ejabberd_c2s:broadcast(C2SPid, - {pep_message, binary_to_list(Node)++"+notify"}, - _Sender = jlib:make_jid(LUser, LServer, ""), + {pep_message, <<((Node))/binary, "+notify">>}, + _Sender = jlib:make_jid(LUser, LServer, <<"">>), _StanzaToSend = add_extended_headers(Stanza, _ReplyTo = extended_headers([jlib:jid_to_string(Publisher)]))); _ -> @@ -4527,6 +4519,13 @@ subscribed_nodes_by_jid(NotifyType, SubsByDepth) -> user_resources(User, Server) -> ejabberd_sm:get_user_resources(User, Server). +user_resource(User, Server, <<>>) -> + case user_resources(User, Server) of + [R | _] -> R; + _ -> <<>> + end; +user_resource(_, _, Resource) -> Resource. + %%%%%%% Configuration handling %%<p>There are several reasons why the default node configuration options request might fail:</p> diff --git a/src/mod_pubsub_odbc.erl b/src/mod_pubsub_odbc.erl index 00e619213..45c30a11b 100644 --- a/src/mod_pubsub_odbc.erl +++ b/src/mod_pubsub_odbc.erl @@ -475,7 +475,7 @@ send_loop(State) -> end; (_) -> ok end, - Subscriptions) + lists:usort(Subscriptions)) end, State#state.plugins), if not State#state.ignore_pep_from_offline -> @@ -817,22 +817,21 @@ disco_items(Host, Node, From) -> %% presence hooks handling functions %% -caps_update(#jid{luser = U, lserver = S, lresource = R} = From, To, _Features) -> - Pid = ejabberd_sm:get_session_pid(U, S, R), - presence_probe(From, To, Pid). - -presence_probe(#jid{luser = User, lserver = Server, lresource = Resource} = JID, - JID, Pid) -> - presence(Server, {presence, JID, Pid}), - presence(Server, {presence, User, Server, [Resource], JID}); -presence_probe(#jid{luser = User, lserver = Server}, - #jid{luser = User, lserver = Server}, _Pid) -> - %% ignore presence_probe from other ressources for the current user - %% this way, we do not send duplicated last items if user already connected with other clients +caps_update(#jid{luser = U, lserver = S, lresource = R}, #jid{lserver = Host} = JID, _Features) + when Host =/= S -> + presence(Host, {presence, U, S, [R], JID}); +caps_update(_From, _To, _Feature) -> + ok. + +presence_probe(#jid{luser = U, lserver = S, lresource = R} = JID, JID, Pid) -> + presence(S, {presence, JID, Pid}), + presence(S, {presence, U, S, [R], JID}); +presence_probe(#jid{luser = U, lserver = S}, #jid{luser = U, lserver = S}, _Pid) -> + %% ignore presence_probe from my other ressources + %% to not get duplicated last items ok; -presence_probe(#jid{luser = User, lserver = Server, lresource = Resource}, - #jid{lserver = Host} = JID, _Pid) -> - presence(Host, {presence, User, Server, [Resource], JID}). +presence_probe(#jid{luser = U, lserver = S, lresource = R}, #jid{lserver = Host} = JID, _Pid) -> + presence(Host, {presence, U, S, [R], JID}). presence(ServerHost, Presence) -> SendLoop = case @@ -1273,7 +1272,7 @@ command_disco_info(_Host, ?NS_PUBSUB_GET_PENDING, node_disco_info(Host, Node, From) -> node_disco_info(Host, Node, From, true, true). -node_disco_info(Host, Node, From, Identity, Features) -> +node_disco_info(Host, Node, From, _Identity, _Features) -> % Action = % fun(#pubsub_node{type = Type, id = NodeId}) -> % I = case Identity of @@ -2618,9 +2617,9 @@ publish_item(Host, ServerHost, Node, Publisher, ItemId, Payload, Access) -> Features = features(Type), PublishFeature = lists:member(<<"publish">>, Features), PublishModel = get_option(Options, publish_model), - MaxItems = max_items(Host, Options), DeliverPayloads = get_option(Options, deliver_payloads), PersistItems = get_option(Options, persist_items), + MaxItems = max_items(Host, Options), PayloadCount = payload_xmlelements(Payload), PayloadSize = byte_size(term_to_binary(Payload)) - 2, PayloadMaxSize = get_option(Options, max_payload_size), @@ -3013,7 +3012,7 @@ send_items(Host, Node, NodeId, Type, LJID, last) -> ModifNow, ModifUSR) end, ejabberd_router:route(service_jid(Host), jlib:make_jid(LJID), Stanza); -send_items(Host, Node, NodeId, Type, LJID, Number) -> +send_items(Host, Node, NodeId, Type, {U, S, R} = LJID, Number) -> ToSend = case node_action(Host, Type, get_items, [NodeId, LJID]) of @@ -3026,6 +3025,8 @@ send_items(Host, Node, NodeId, Type, LJID, Number) -> _ -> [] end, Stanza = case ToSend of + [] -> + undefined; [LastItem] -> {ModifNow, ModifUSR} = LastItem#pubsub_item.modification, @@ -3039,7 +3040,22 @@ send_items(Host, Node, NodeId, Type, LJID, Number) -> attrs = nodeAttr(Node), children = itemsEls(ToSend)}]) end, - ejabberd_router:route(service_jid(Host), jlib:make_jid(LJID), Stanza). + case {is_tuple(Host), Stanza} of + {_, undefined} -> + ok; + {false, _} -> + ejabberd_router:route(service_jid(Host), + jlib:make_jid(LJID), Stanza); + {true, _} -> + case ejabberd_sm:get_session_pid(U, S, R) of + C2SPid when is_pid(C2SPid) -> + ejabberd_c2s:broadcast(C2SPid, + {pep_message, + <<((Node))/binary, "+notify">>}, + _Sender = service_jid(Host), Stanza); + _ -> ok + end + end. %% @spec (Host, JID, Plugins) -> {error, Reason} | {result, Response} %% Host = host() @@ -3235,8 +3251,7 @@ set_affiliations(Host, Node, From, EntitiesEls) -> error -> {error, ?ERR_BAD_REQUEST}; _ -> Action = fun (#pubsub_node{type = Type, - id = NodeId} = - N) -> + id = NodeId}) -> Owners = node_owners_call(Type, NodeId), case lists:member(Owner, Owners) of true -> @@ -4062,15 +4077,7 @@ broadcast_stanza(Host, _Node, _NodeId, _Type, NodeOptions, SubsByDepth, NotifyTy broadcast_stanza({LUser, LServer, LResource}, Publisher, Node, NodeId, Type, NodeOptions, SubsByDepth, NotifyType, BaseStanza, SHIM) -> broadcast_stanza({LUser, LServer, LResource}, Node, NodeId, Type, NodeOptions, SubsByDepth, NotifyType, BaseStanza, SHIM), %% Handles implicit presence subscriptions - SenderResource = case LResource of - [] -> - case user_resources(LUser, LServer) of - [Resource|_] -> Resource; - _ -> <<>> - end; - _ -> - LResource - end, + SenderResource = user_resource(LUser, LServer, LResource), case ejabberd_sm:get_session_pid(LUser, LServer, SenderResource) of C2SPid when is_pid(C2SPid) -> Stanza = case get_option(NodeOptions, notification_type, headline) of @@ -4081,8 +4088,8 @@ broadcast_stanza({LUser, LServer, LResource}, Publisher, Node, NodeId, Type, Nod %% Also, add "replyto" if entity has presence subscription to the account owner %% See XEP-0163 1.1 section 4.3.1 ejabberd_c2s:broadcast(C2SPid, - {pep_message, binary_to_list(Node)++"+notify"}, - _Sender = jlib:make_jid(LUser, LServer, ""), + {pep_message, <<((Node))/binary, "+notify">>}, + _Sender = jlib:make_jid(LUser, LServer, <<"">>), _StanzaToSend = add_extended_headers(Stanza, _ReplyTo = extended_headers([jlib:jid_to_string(Publisher)]))); _ -> @@ -4147,6 +4154,13 @@ subscribed_nodes_by_jid(NotifyType, SubsByDepth) -> user_resources(User, Server) -> ejabberd_sm:get_user_resources(User, Server). +user_resource(User, Server, <<>>) -> + case user_resources(User, Server) of + [R | _] -> R; + _ -> <<>> + end; +user_resource(_, _, Resource) -> Resource. + %%%%%%% Configuration handling %%<p>There are several reasons why the default node configuration options request might fail:</p> diff --git a/src/mod_sip.erl b/src/mod_sip.erl new file mode 100644 index 000000000..8ed4ed8cf --- /dev/null +++ b/src/mod_sip.erl @@ -0,0 +1,279 @@ +%%%------------------------------------------------------------------- +%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net> +%%% @copyright (C) 2014, Evgeny Khramtsov +%%% @doc +%%% +%%% @end +%%% Created : 21 Apr 2014 by Evgeny Khramtsov <ekhramtsov@process-one.net> +%%%------------------------------------------------------------------- +-module(mod_sip). + +-behaviour(gen_mod). +-behaviour(esip). + +%% API +-export([start/2, stop/1, prepare_request/1, make_response/2, at_my_host/1]). + +%% esip_callbacks +-export([data_in/2, data_out/2, message_in/2, message_out/2, + request/2, request/3, response/2, locate/1]). + +-include("ejabberd.hrl"). +-include("logger.hrl"). +-include("esip.hrl"). + +%%%=================================================================== +%%% API +%%%=================================================================== +start(_Host, _Opts) -> + ejabberd:start_app(esip), + esip:set_config_value(max_server_transactions, 10000), + esip:set_config_value(max_client_transactions, 10000), + esip:set_config_value(software, <<"ejabberd ", (?VERSION)/binary>>), + esip:set_config_value(module, ?MODULE), + Spec = {mod_sip_registrar, {mod_sip_registrar, start_link, []}, + transient, 2000, worker, [mod_sip_registrar]}, + TmpSupSpec = {mod_sip_proxy_sup, + {ejabberd_tmp_sup, start_link, + [mod_sip_proxy_sup, mod_sip_proxy]}, + permanent, infinity, supervisor, [ejabberd_tmp_sup]}, + supervisor:start_child(ejabberd_sup, Spec), + supervisor:start_child(ejabberd_sup, TmpSupSpec), + ok. + +stop(_Host) -> + ok. + +data_in(Data, #sip_socket{type = Transport, + addr = {MyIP, MyPort}, + peer = {PeerIP, PeerPort}}) -> + ?DEBUG( + "SIP [~p/in] ~s:~p -> ~s:~p:~n~s", + [Transport, inet_parse:ntoa(PeerIP), PeerPort, + inet_parse:ntoa(MyIP), MyPort, Data]). + +data_out(Data, #sip_socket{type = Transport, + addr = {MyIP, MyPort}, + peer = {PeerIP, PeerPort}}) -> + ?DEBUG( + "SIP [~p/out] ~s:~p -> ~s:~p:~n~s", + [Transport, inet_parse:ntoa(MyIP), MyPort, + inet_parse:ntoa(PeerIP), PeerPort, Data]). + +message_in(#sip{type = request, method = M} = Req, SIPSock) + when M /= <<"ACK">>, M /= <<"CANCEL">> -> + case action(Req, SIPSock) of + {relay, _LServer} -> + ok; + Action -> + request(Req, SIPSock, undefined, Action) + end; +message_in(_, _) -> + ok. + +message_out(_, _) -> + ok. + +response(_Resp, _SIPSock) -> + ok. + +request(_Req, _SIPSock) -> + error. + +request(Req, SIPSock, TrID) -> + request(Req, SIPSock, TrID, action(Req, SIPSock)). + +request(Req, SIPSock, TrID, Action) -> + case Action of + to_me -> + process(Req, SIPSock); + register -> + mod_sip_registrar:request(Req, SIPSock); + loop -> + make_response(Req, #sip{status = 483, type = response}); + {unsupported, Require} -> + make_response(Req, #sip{status = 420, + type = response, + hdrs = [{'unsupported', + Require}]}); + {relay, LServer} -> + case mod_sip_proxy:start(LServer, []) of + {ok, Pid} -> + mod_sip_proxy:route(Req, SIPSock, TrID, Pid), + {mod_sip_proxy, route, [Pid]}; + Err -> + ?INFO_MSG("failed to proxy request ~p: ~p", [Req, Err]), + Err + end; + {proxy_auth, Host} -> + make_response( + Req, + #sip{status = 407, + type = response, + hdrs = [{'proxy-authenticate', + make_auth_hdr(Host)}]}); + {auth, Host} -> + make_response( + Req, + #sip{status = 401, + type = response, + hdrs = [{'www-authenticate', + make_auth_hdr(Host)}]}); + deny -> + make_response(Req, #sip{status = 403, + type = response}); + not_found -> + make_response(Req, #sip{status = 480, + type = response}) + end. + +locate(_SIPMsg) -> + ok. + +find(#uri{user = User, host = Host}) -> + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Host), + if LUser == <<"">> -> + to_me; + true -> + case mod_sip_registrar:find_sockets(LUser, LServer) of + [] -> + not_found; + [_|_] -> + {relay, LServer} + end + end. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +action(#sip{method = <<"REGISTER">>, type = request, hdrs = Hdrs, + uri = #uri{user = <<"">>} = URI} = Req, SIPSock) -> + case at_my_host(URI) of + true -> + case esip:get_hdrs('require', Hdrs) of + [_|_] = Require -> + {unsupported, Require}; + _ -> + {_, ToURI, _} = esip:get_hdr('to', Hdrs), + case at_my_host(ToURI) of + true -> + case check_auth(Req, 'authorization', SIPSock) of + true -> + register; + false -> + {auth, ToURI#uri.host} + end; + false -> + deny + end + end; + false -> + deny + end; +action(#sip{method = Method, hdrs = Hdrs, type = request} = Req, SIPSock) -> + case esip:get_hdr('max-forwards', Hdrs) of + 0 when Method == <<"OPTIONS">> -> + to_me; + 0 -> + loop; + _ -> + case esip:get_hdrs('proxy-require', Hdrs) of + [_|_] = Require -> + {unsupported, Require}; + _ -> + {_, ToURI, _} = esip:get_hdr('to', Hdrs), + {_, FromURI, _} = esip:get_hdr('from', Hdrs), + case at_my_host(FromURI) of + true -> + case check_auth(Req, 'proxy-authorization', SIPSock) of + true -> + case at_my_host(ToURI) of + true -> + find(ToURI); + false -> + LServer = jlib:nameprep(FromURI#uri.host), + {relay, LServer} + end; + false -> + {proxy_auth, FromURI#uri.host} + end; + false -> + case at_my_host(ToURI) of + true -> + find(ToURI); + false -> + deny + end + end + end + end. + +check_auth(#sip{method = <<"CANCEL">>}, _, _SIPSock) -> + true; +check_auth(#sip{method = Method, hdrs = Hdrs, body = Body}, AuthHdr, _SIPSock) -> + Issuer = case AuthHdr of + 'authorization' -> + to; + 'proxy-authorization' -> + from + end, + {_, #uri{user = User, host = Host}, _} = esip:get_hdr(Issuer, Hdrs), + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Host), + case lists:filter( + fun({_, Params}) -> + Username = esip:get_param(<<"username">>, Params), + Realm = esip:get_param(<<"realm">>, Params), + (LUser == esip:unquote(Username)) + and (LServer == esip:unquote(Realm)) + end, esip:get_hdrs(AuthHdr, Hdrs)) of + [Auth|_] -> + case ejabberd_auth:get_password_s(LUser, LServer) of + <<"">> -> + false; + Password -> + esip:check_auth(Auth, Method, Body, Password) + end; + [] -> + false + end. + +allow() -> + [<<"OPTIONS">>, <<"REGISTER">>]. + +process(#sip{method = <<"OPTIONS">>} = Req, _) -> + make_response(Req, #sip{type = response, status = 200, + hdrs = [{'allow', allow()}]}); +process(#sip{method = <<"REGISTER">>} = Req, _) -> + make_response(Req, #sip{type = response, status = 400}); +process(Req, _) -> + make_response(Req, #sip{type = response, status = 405, + hdrs = [{'allow', allow()}]}). + +prepare_request(#sip{hdrs = Hdrs1} = Req) -> + MF = esip:get_hdr('max-forwards', Hdrs1), + Hdrs2 = esip:set_hdr('max-forwards', MF-1, Hdrs1), + Hdrs3 = lists:filter( + fun({'proxy-authorization', {_, Params}}) -> + Realm = esip:unquote(esip:get_param(<<"realm">>, Params)), + not is_my_host(jlib:nameprep(Realm)); + (_) -> + true + end, Hdrs2), + Req#sip{hdrs = Hdrs3}. + +make_auth_hdr(LServer) -> + Realm = jlib:nameprep(LServer), + {<<"Digest">>, [{<<"realm">>, esip:quote(Realm)}, + {<<"qop">>, esip:quote(<<"auth">>)}, + {<<"nonce">>, esip:quote(esip:make_hexstr(20))}]}. + +make_response(Req, Resp) -> + esip:make_response(Req, Resp, esip:make_tag()). + +at_my_host(#uri{host = Host}) -> + is_my_host(jlib:nameprep(Host)). + +is_my_host(LServer) -> + gen_mod:is_loaded(LServer, ?MODULE). diff --git a/src/mod_sip_proxy.erl b/src/mod_sip_proxy.erl new file mode 100644 index 000000000..cae75bff8 --- /dev/null +++ b/src/mod_sip_proxy.erl @@ -0,0 +1,277 @@ +%%%------------------------------------------------------------------- +%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net> +%%% @copyright (C) 2014, Evgeny Khramtsov +%%% @doc +%%% +%%% @end +%%% Created : 21 Apr 2014 by Evgeny Khramtsov <ekhramtsov@process-one.net> +%%%------------------------------------------------------------------- +-module(mod_sip_proxy). + +-define(GEN_FSM, p1_fsm). +-behaviour(?GEN_FSM). + +%% API +-export([start/2, start_link/2, route/4]). + +%% gen_fsm callbacks +-export([init/1, wait_for_request/2, wait_for_response/2, + handle_event/3, handle_sync_event/4, + handle_info/3, terminate/3, code_change/4]). + +-include("ejabberd.hrl"). +-include("logger.hrl"). +-include("esip.hrl"). + +-record(state, {host = <<"">> :: binary(), + opts = [] :: [{certfile, binary()}], + orig_trid, + responses = [] :: [#sip{}], + tr_ids = [] :: list(), + orig_req :: #sip{}}). + +%%%=================================================================== +%%% API +%%%=================================================================== +start(LServer, Opts) -> + supervisor:start_child(mod_sip_proxy_sup, [LServer, Opts]). + +start_link(LServer, Opts) -> + ?GEN_FSM:start_link(?MODULE, [LServer, Opts], []). + +route(SIPMsg, _SIPSock, TrID, Pid) -> + ?GEN_FSM:send_event(Pid, {SIPMsg, TrID}). + +%%%=================================================================== +%%% gen_fsm callbacks +%%%=================================================================== +init([Host, Opts]) -> + Opts1 = add_certfile(Host, Opts), + {ok, wait_for_request, #state{opts = Opts1, host = Host}}. + +wait_for_request({#sip{type = request} = Req, TrID}, State) -> + Opts = State#state.opts, + Req1 = mod_sip:prepare_request(Req), + case connect(Req1, Opts) of + {ok, SIPSockets} -> + NewState = + lists:foldl( + fun(_SIPSocket, {error, _} = Err) -> + Err; + (SIPSocket, #state{tr_ids = TrIDs} = AccState) -> + Req2 = add_via(SIPSocket, State#state.host, Req1), + case esip:request(SIPSocket, Req2, + {?MODULE, route, [self()]}) of + {ok, ClientTrID} -> + NewTrIDs = [ClientTrID|TrIDs], + AccState#state{tr_ids = NewTrIDs}; + Err -> + cancel_pending_transactions(AccState), + Err + end + end, State, SIPSockets), + case NewState of + {error, _} = Err -> + {Status, Reason} = esip:error_status(Err), + esip:reply(TrID, mod_sip:make_response( + Req, #sip{type = response, + status = Status, + reason = Reason})), + {stop, normal, State}; + _ -> + {next_state, wait_for_response, + NewState#state{orig_req = Req, orig_trid = TrID}} + end; + {error, notfound} -> + esip:reply(TrID, mod_sip:make_response( + Req, #sip{type = response, + status = 480, + reason = esip:reason(480)})), + {stop, normal, State}; + Err -> + {Status, Reason} = esip:error_status(Err), + esip:reply(TrID, mod_sip:make_response( + Req, #sip{type = response, + status = Status, + reason = Reason})), + {stop, normal, State} + end; +wait_for_request(_Event, State) -> + {next_state, wait_for_request, State}. + +wait_for_response({#sip{method = <<"CANCEL">>, type = request}, _TrID}, State) -> + cancel_pending_transactions(State), + {next_state, wait_for_response, State}; +wait_for_response({Resp, TrID}, + #state{orig_req = #sip{method = Method} = Req} = State) -> + case Resp of + {error, timeout} when Method /= <<"INVITE">> -> + %% Absorb useless 408. See RFC4320 + choose_best_response(State), + esip:stop_transaction(State#state.orig_trid), + {stop, normal, State}; + {error, _} -> + {Status, Reason} = esip:error_status(Resp), + State1 = mark_transaction_as_complete(TrID, State), + SIPResp = mod_sip:make_response(Req, + #sip{type = response, + status = Status, + reason = Reason}), + State2 = collect_response(SIPResp, State1), + case State2#state.tr_ids of + [] -> + choose_best_response(State2), + {stop, normal, State2}; + _ -> + {next_state, wait_for_response, State2} + end; + #sip{status = 100} -> + {next_state, wait_for_response, State}; + #sip{status = Status} -> + {[_|Vias], NewHdrs} = esip:split_hdrs('via', Resp#sip.hdrs), + NewResp = case Vias of + [] -> + Resp#sip{hdrs = NewHdrs}; + _ -> + Resp#sip{hdrs = [{'via', Vias}|NewHdrs]} + end, + if Status < 300 -> + esip:reply(State#state.orig_trid, NewResp); + true -> + ok + end, + State1 = if Status >= 200 -> + mark_transaction_as_complete(TrID, State); + true -> + State + end, + State2 = if Status >= 300 -> + collect_response(NewResp, State1); + true -> + State1 + end, + if Status >= 600 -> + cancel_pending_transactions(State2); + true -> + ok + end, + case State2#state.tr_ids of + [] -> + choose_best_response(State2), + {stop, normal, State2}; + _ -> + {next_state, wait_for_response, State2} + end + end; +wait_for_response(_Event, State) -> + {next_state, wait_for_response, State}. + +handle_event(_Event, StateName, State) -> + {next_state, StateName, State}. + +handle_sync_event(_Event, _From, StateName, State) -> + Reply = ok, + {reply, Reply, StateName, State}. + +handle_info(_Info, StateName, State) -> + {next_state, StateName, State}. + +terminate(_Reason, _StateName, _State) -> + ok. + +code_change(_OldVsn, StateName, State, _Extra) -> + {ok, StateName, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +connect(#sip{hdrs = Hdrs} = Req, Opts) -> + {_, ToURI, _} = esip:get_hdr('to', Hdrs), + case mod_sip:at_my_host(ToURI) of + true -> + LUser = jlib:nodeprep(ToURI#uri.user), + LServer = jlib:nameprep(ToURI#uri.host), + case mod_sip_registrar:find_sockets(LUser, LServer) of + [_|_] = SIPSocks -> + {ok, SIPSocks}; + [] -> + {error, notfound} + end; + false -> + case esip:connect(Req, Opts) of + {ok, SIPSock} -> + {ok, [SIPSock]}; + {error, _} = Err -> + Err + end + end. + +cancel_pending_transactions(State) -> + lists:foreach(fun esip:cancel/1, State#state.tr_ids). + +add_certfile(LServer, Opts) -> + case ejabberd_config:get_option({domain_certfile, LServer}, + fun iolist_to_binary/1) of + CertFile when is_binary(CertFile), CertFile /= <<"">> -> + [{certfile, CertFile}|Opts]; + _ -> + Opts + end. + +add_via(#sip_socket{type = Transport}, LServer, #sip{hdrs = Hdrs} = Req) -> + ConfiguredVias = get_configured_vias(LServer), + {ViaHost, ViaPort} = proplists:get_value( + Transport, ConfiguredVias, {LServer, undefined}), + ViaTransport = case Transport of + tls -> <<"TLS">>; + tcp -> <<"TCP">>; + udp -> <<"UDP">> + end, + Via = #via{transport = ViaTransport, + host = ViaHost, + port = ViaPort, + params = [{<<"branch">>, esip:make_branch()}, + {<<"rport">>, <<"">>}]}, + Req#sip{hdrs = [{'via', [Via]}|Hdrs]}. + +get_configured_vias(LServer) -> + gen_mod:get_module_opt( + LServer, ?MODULE, via, + fun(L) -> + lists:map( + fun(Opts) -> + Type = proplists:get_value(type, Opts), + Host = proplists:get_value(host, Opts), + Port = proplists:get_value(port, Opts), + true = (Type == tcp) or (Type == tls) or (Type == udp), + true = is_binary(Host) and (Host /= <<"">>), + true = (is_integer(Port) + and (Port > 0) and (Port < 65536)) + or (Port == undefined), + {Type, {Host, Port}} + end, L) + end, []). + +mark_transaction_as_complete(TrID, State) -> + NewTrIDs = lists:delete(TrID, State#state.tr_ids), + State#state{tr_ids = NewTrIDs}. + +collect_response(Resp, #state{responses = Resps} = State) -> + State#state{responses = [Resp|Resps]}. + +choose_best_response(#state{responses = Responses} = State) -> + SortedResponses = lists:keysort(#sip.status, Responses), + case lists:filter( + fun(#sip{status = Status}) -> + Status >= 600 + end, SortedResponses) of + [Resp|_] -> + esip:reply(State#state.orig_trid, Resp); + [] -> + case SortedResponses of + [Resp|_] -> + esip:reply(State#state.orig_trid, Resp); + [] -> + ok + end + end. diff --git a/src/mod_sip_registrar.erl b/src/mod_sip_registrar.erl new file mode 100644 index 000000000..57c55be08 --- /dev/null +++ b/src/mod_sip_registrar.erl @@ -0,0 +1,336 @@ +%%%------------------------------------------------------------------- +%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net> +%%% @copyright (C) 2014, Evgeny Khramtsov +%%% @doc +%%% +%%% @end +%%% Created : 23 Apr 2014 by Evgeny Khramtsov <ekhramtsov@process-one.net> +%%%------------------------------------------------------------------- +-module(mod_sip_registrar). + +-define(GEN_SERVER, p1_server). +-behaviour(?GEN_SERVER). + +%% API +-export([start_link/0, request/2, find_sockets/2]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-include("ejabberd.hrl"). +-include("logger.hrl"). +-include("esip.hrl"). + +-define(CALL_TIMEOUT, timer:seconds(30)). + +-record(binding, {socket = #sip_socket{}, + call_id = <<"">> :: binary(), + cseq = 0 :: non_neg_integer(), + timestamp = now() :: erlang:timestamp(), + tref = make_ref() :: reference(), + expires = 0 :: non_neg_integer()}). + +-record(sip_session, {us = {<<"">>, <<"">>} :: {binary(), binary()}, + bindings = [] :: [#binding{}]}). + +-record(state, {}). + +%%%=================================================================== +%%% API +%%%=================================================================== +start_link() -> + ?GEN_SERVER:start_link({local, ?MODULE}, ?MODULE, [], []). + +request(#sip{hdrs = Hdrs} = Req, SIPSock) -> + {_, #uri{user = U, host = S}, _} = esip:get_hdr('to', Hdrs), + LUser = jlib:nodeprep(U), + LServer = jlib:nameprep(S), + {PeerIP, _} = SIPSock#sip_socket.peer, + US = {LUser, LServer}, + CallID = esip:get_hdr('call-id', Hdrs), + CSeq = esip:get_hdr('cseq', Hdrs), + Expires = esip:get_hdr('expires', Hdrs, 0), + case esip:get_hdrs('contact', Hdrs) of + [<<"*">>] when Expires == 0 -> + case unregister_session(US, SIPSock, CallID, CSeq) of + ok -> + ?INFO_MSG("unregister SIP session for user ~s@~s from ~s", + [LUser, LServer, inet_parse:ntoa(PeerIP)]), + mod_sip:make_response( + Req, #sip{type = response, status = 200}); + {error, Why} -> + {Status, Reason} = make_status(Why), + mod_sip:make_response( + Req, #sip{type = response, + status = Status, + reason = Reason}) + end; + [{_, _URI, _Params}|_] = Contacts -> + ExpiresList = lists:map( + fun({_, _, Params}) -> + case to_integer( + esip:get_param( + <<"expires">>, Params), + 0, (1 bsl 32)-1) of + {ok, E} -> E; + _ -> Expires + end + end, Contacts), + Expires1 = lists:max(ExpiresList), + Contact = {<<"">>, #uri{user = LUser, host = LServer}, + [{<<"expires">>, jlib:integer_to_binary(Expires1)}]}, + MinExpires = min_expires(), + if Expires1 >= MinExpires -> + case register_session(US, SIPSock, CallID, CSeq, Expires1) of + ok -> + ?INFO_MSG("register SIP session for user ~s@~s from ~s", + [LUser, LServer, inet_parse:ntoa(PeerIP)]), + mod_sip:make_response( + Req, + #sip{type = response, + status = 200, + hdrs = [{'contact', [Contact]}]}); + {error, Why} -> + {Status, Reason} = make_status(Why), + mod_sip:make_response( + Req, #sip{type = response, + status = Status, + reason = Reason}) + end; + Expires1 > 0, Expires1 < MinExpires -> + mod_sip:make_response( + Req, #sip{type = response, + status = 423, + hdrs = [{'min-expires', MinExpires}]}); + true -> + case unregister_session(US, SIPSock, CallID, CSeq) of + ok -> + ?INFO_MSG("unregister SIP session for user ~s@~s from ~s", + [LUser, LServer, inet_parse:ntoa(PeerIP)]), + mod_sip:make_response( + Req, + #sip{type = response, status = 200, + hdrs = [{'contact', [Contact]}]}); + {error, Why} -> + {Status, Reason} = make_status(Why), + mod_sip:make_response( + Req, #sip{type = response, + status = Status, + reason = Reason}) + end + end; + [] -> + case mnesia:dirty_read(sip_session, US) of + [#sip_session{bindings = Bindings}] -> + case pop_previous_binding(SIPSock, Bindings) of + {ok, #binding{expires = Expires1}, _} -> + Contact = {<<"">>, + #uri{user = LUser, host = LServer}, + [{<<"expires">>, + jlib:integer_to_binary(Expires1)}]}, + mod_sip:make_response( + Req, #sip{type = response, status = 200, + hdrs = [{'contact', [Contact]}]}); + {error, notfound} -> + {Status, Reason} = make_status(notfound), + mod_sip:make_response( + Req, #sip{type = response, + status = Status, + reason = Reason}) + end; + [] -> + {Status, Reason} = make_status(notfound), + mod_sip:make_response( + Req, #sip{type = response, + status = Status, + reason = Reason}) + end; + _ -> + mod_sip:make_response(Req, #sip{type = response, status = 400}) + end. + +find_sockets(U, S) -> + case mnesia:dirty_read(sip_session, {U, S}) of + [#sip_session{bindings = Bindings}] -> + [Binding#binding.socket || Binding <- Bindings]; + [] -> + [] + end. + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== +init([]) -> + mnesia:create_table(sip_session, + [{ram_copies, [node()]}, + {attributes, record_info(fields, sip_session)}]), + mnesia:add_table_copy(sip_session, node(), ram_copies), + {ok, #state{}}. + +handle_call({write, Session}, _From, State) -> + Res = write_session(Session), + {reply, Res, State}; +handle_call({delete, US, SIPSocket, CallID, CSeq}, _From, State) -> + Res = delete_session(US, SIPSocket, CallID, CSeq), + {reply, Res, State}; +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({write, Session}, State) -> + write_session(Session), + {noreply, State}; +handle_info({delete, US, SIPSocket, CallID, CSeq}, State) -> + delete_session(US, SIPSocket, CallID, CSeq), + {noreply, State}; +handle_info({timeout, TRef, US}, State) -> + delete_expired_session(US, TRef), + {noreply, State}; +handle_info(_Info, State) -> + ?ERROR_MSG("got unexpected info: ~p", [_Info]), + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +register_session(US, SIPSocket, CallID, CSeq, Expires) -> + Session = #sip_session{us = US, + bindings = [#binding{socket = SIPSocket, + call_id = CallID, + cseq = CSeq, + timestamp = now(), + expires = Expires}]}, + call({write, Session}). + +unregister_session(US, SIPSocket, CallID, CSeq) -> + Msg = {delete, US, SIPSocket, CallID, CSeq}, + call(Msg). + +write_session(#sip_session{us = {U, S} = US, + bindings = [#binding{socket = SIPSocket, + call_id = CallID, + expires = Expires, + cseq = CSeq} = Binding]}) -> + case mnesia:dirty_read(sip_session, US) of + [#sip_session{bindings = Bindings}] -> + case pop_previous_binding(SIPSocket, Bindings) of + {ok, #binding{call_id = CallID, cseq = PrevCSeq}, _} + when PrevCSeq > CSeq -> + {error, cseq_out_of_order}; + {ok, #binding{tref = Tref}, Bindings1} -> + erlang:cancel_timer(Tref), + NewTRef = erlang:start_timer(Expires * 1000, self(), US), + NewBindings = [Binding#binding{tref = NewTRef}|Bindings1], + mnesia:dirty_write( + #sip_session{us = US, bindings = NewBindings}); + {error, notfound} -> + MaxSessions = ejabberd_sm:get_max_user_sessions(U, S), + if length(Bindings) < MaxSessions -> + NewTRef = erlang:start_timer(Expires * 1000, self(), US), + NewBindings = [Binding#binding{tref = NewTRef}|Bindings], + mnesia:dirty_write( + #sip_session{us = US, bindings = NewBindings}); + true -> + {error, too_many_sessions} + end + end; + [] -> + NewTRef = erlang:start_timer(Expires * 1000, self(), US), + NewBindings = [Binding#binding{tref = NewTRef}], + mnesia:dirty_write(#sip_session{us = US, bindings = NewBindings}) + end. + +delete_session(US, SIPSocket, CallID, CSeq) -> + case mnesia:dirty_read(sip_session, US) of + [#sip_session{bindings = Bindings}] -> + case pop_previous_binding(SIPSocket, Bindings) of + {ok, #binding{call_id = CallID, cseq = PrevCSeq}, _} + when PrevCSeq > CSeq -> + {error, cseq_out_of_order}; + {ok, #binding{tref = TRef}, []} -> + erlang:cancel_timer(TRef), + mnesia:dirty_delete(sip_session, US); + {ok, #binding{tref = TRef}, NewBindings} -> + erlang:cancel_timer(TRef), + mnesia:dirty_write(sip_session, + #sip_session{us = US, + bindings = NewBindings}); + {error, notfound} -> + {error, notfound} + end; + [] -> + {error, notfound} + end. + +delete_expired_session(US, TRef) -> + case mnesia:dirty_read(sip_session, US) of + [#sip_session{bindings = Bindings}] -> + case lists:filter( + fun(#binding{tref = TRef1}) when TRef1 == TRef -> + false; + (_) -> + true + end, Bindings) of + [] -> + mnesia:dirty_delete(sip_session, US); + NewBindings -> + mnesia:dirty_write(sip_session, + #sip_session{us = US, + bindings = NewBindings}) + end; + [] -> + ok + end. + +min_expires() -> + 60. + +to_integer(Bin, Min, Max) -> + case catch list_to_integer(binary_to_list(Bin)) of + N when N >= Min, N =< Max -> + {ok, N}; + _ -> + error + end. + +pop_previous_binding(#sip_socket{peer = Peer}, Bindings) -> + case lists:partition( + fun(#binding{socket = #sip_socket{peer = Peer1}}) -> + Peer1 == Peer + end, Bindings) of + {[Binding], RestBindings} -> + {ok, Binding, RestBindings}; + _ -> + {error, notfound} + end. + +call(Msg) -> + case catch ?GEN_SERVER:call(?MODULE, Msg, ?CALL_TIMEOUT) of + {'EXIT', {timeout, _}} -> + {error, timeout}; + {'EXIT', Why} -> + {error, Why}; + Reply -> + Reply + end. + +make_status(notfound) -> + {404, esip:reason(404)}; +make_status(cseq_out_of_order) -> + {500, <<"CSeq is Out of Order">>}; +make_status(timeout) -> + {408, esip:reason(408)}; +make_status(too_many_sessions) -> + {503, <<"Too Many Registered Sessions">>}; +make_status(_) -> + {500, esip:reason(500)}. |