diff options
Diffstat (limited to 'test/suite.erl')
-rw-r--r-- | test/suite.erl | 547 |
1 files changed, 445 insertions, 102 deletions
diff --git a/test/suite.erl b/test/suite.erl index c5593c4cf..3d832dd59 100644 --- a/test/suite.erl +++ b/test/suite.erl @@ -13,6 +13,7 @@ -include("suite.hrl"). -include_lib("kernel/include/file.hrl"). +-include("mod_roster.hrl"). %%%=================================================================== %%% API @@ -27,14 +28,22 @@ init_config(Config) -> SASLPath = filename:join([PrivDir, "sasl.log"]), MnesiaDir = filename:join([PrivDir, "mnesia"]), CertFile = filename:join([DataDir, "cert.pem"]), + SelfSignedCertFile = filename:join([DataDir, "self-signed-cert.pem"]), + CAFile = filename:join([DataDir, "ca.pem"]), {ok, CWD} = file:get_cwd(), {ok, _} = file:copy(CertFile, filename:join([CWD, "cert.pem"])), + {ok, _} = file:copy(SelfSignedCertFile, + filename:join([CWD, "self-signed-cert.pem"])), + {ok, _} = file:copy(CAFile, filename:join([CWD, "ca.pem"])), {ok, CfgContentTpl} = file:read_file(ConfigPathTpl), + Password = <<"password!@#$%^&*()'\"`~<>+-/;:_=[]{}|\\">>, CfgContent = process_config_tpl(CfgContentTpl, [ {c2s_port, 5222}, {loglevel, 4}, {s2s_port, 5269}, + {component_port, 5270}, {web_port, 5280}, + {password, Password}, {mysql_server, <<"localhost">>}, {mysql_port, 3306}, {mysql_db, <<"ejabberd_test">>}, @@ -58,17 +67,35 @@ init_config(Config) -> application:set_env(mnesia, dir, MnesiaDir), [{server_port, ct:get_config(c2s_port, 5222)}, {server_host, "localhost"}, + {component_port, ct:get_config(component_port, 5270)}, + {s2s_port, ct:get_config(s2s_port, 5269)}, {server, ?COMMON_VHOST}, {user, <<"test_single!#$%^*()`~+-;_=[]{}|\\">>}, + {nick, <<"nick!@#$%^&*()'\"`~<>+-/;:_=[]{}|\\">>}, {master_nick, <<"master_nick!@#$%^&*()'\"`~<>+-/;:_=[]{}|\\">>}, {slave_nick, <<"slave_nick!@#$%^&*()'\"`~<>+-/;:_=[]{}|\\">>}, {room_subject, <<"hello, world!@#$%^&*()'\"`~<>+-/;:_=[]{}|\\">>}, {certfile, CertFile}, + {persistent_room, true}, + {anonymous, false}, + {type, client}, + {xmlns, ?NS_CLIENT}, + {ns_stream, ?NS_STREAM}, + {stream_version, {1, 0}}, + {stream_id, <<"">>}, + {stream_from, <<"">>}, + {db_xmlns, <<"">>}, + {mechs, []}, + {rosterver, false}, + {lang, <<"en">>}, {base_dir, BaseDir}, + {socket, undefined}, + {pubsub_node, <<"node!@#$%^&*()'\"`~<>+-/;:_=[]{}|\\">>}, + {pubsub_node_title, <<"title!@#$%^&*()'\"`~<>+-/;:_=[]{}|\\">>}, {resource, <<"resource!@#$%^&*()'\"`~<>+-/;:_=[]{}|\\">>}, {master_resource, <<"master_resource!@#$%^&*()'\"`~<>+-/;:_=[]{}|\\">>}, {slave_resource, <<"slave_resource!@#$%^&*()'\"`~<>+-/;:_=[]{}|\\">>}, - {password, <<"password!@#$%^&*()'\"`~<>+-/;:_=[]{}|\\">>}, + {password, Password}, {backends, get_config_backends()} |Config]. @@ -115,47 +142,104 @@ process_config_tpl(Content, [{Name, DefaultValue} | Rest]) -> V3 -> V3 end, - NewContent = binary:replace(Content, <<"@@",(atom_to_binary(Name, latin1))/binary, "@@">>, Val), + NewContent = binary:replace(Content, + <<"@@",(atom_to_binary(Name,latin1))/binary, "@@">>, + Val, [global]), process_config_tpl(NewContent, Rest). +stream_header(Config) -> + To = case ?config(server, Config) of + <<"">> -> undefined; + Server -> jid:make(Server) + end, + From = case ?config(stream_from, Config) of + <<"">> -> undefined; + Frm -> jid:make(Frm) + end, + #stream_start{to = To, + from = From, + lang = ?config(lang, Config), + version = ?config(stream_version, Config), + xmlns = ?config(xmlns, Config), + db_xmlns = ?config(db_xmlns, Config), + stream_xmlns = ?config(ns_stream, Config)}. connect(Config) -> - {ok, Sock} = ejabberd_socket:connect( - ?config(server_host, Config), - ?config(server_port, Config), - [binary, {packet, 0}, {active, false}]), - init_stream(set_opt(socket, Sock, Config)). + NewConfig = init_stream(Config), + case ?config(type, NewConfig) of + client -> process_stream_features(NewConfig); + server -> process_stream_features(NewConfig); + component -> NewConfig + end. + +tcp_connect(Config) -> + case ?config(socket, Config) of + undefined -> + Owner = self(), + NS = case ?config(type, Config) of + client -> ?NS_CLIENT; + server -> ?NS_SERVER; + component -> ?NS_COMPONENT + end, + ReceiverPid = spawn(fun() -> receiver(NS, Owner) end), + {ok, Sock} = ejabberd_socket:connect( + ?config(server_host, Config), + ?config(server_port, Config), + [binary, {packet, 0}, {active, false}], + infinity, ReceiverPid), + set_opt(socket, Sock, Config); + _ -> + Config + end. init_stream(Config) -> - ok = send_text(Config, io_lib:format(?STREAM_HEADER, - [?config(server, Config)])), - {xmlstreamstart, <<"stream:stream">>, Attrs} = recv(), - <<"jabber:client">> = fxml:get_attr_s(<<"xmlns">>, Attrs), - <<"1.0">> = fxml:get_attr_s(<<"version">>, Attrs), - #stream_features{sub_els = Fs} = recv(), - Mechs = lists:flatmap( - fun(#sasl_mechanisms{list = Ms}) -> - Ms; - (_) -> - [] - end, Fs), - lists:foldl( - fun(#feature_register{}, Acc) -> - set_opt(register, true, Acc); - (#starttls{}, Acc) -> - set_opt(starttls, true, Acc); - (#compression{methods = Ms}, Acc) -> - set_opt(compression, Ms, Acc); - (_, Acc) -> - Acc - end, set_opt(mechs, Mechs, Config), Fs). + Version = ?config(stream_version, Config), + NewConfig = tcp_connect(Config), + send(NewConfig, stream_header(NewConfig)), + XMLNS = case ?config(type, Config) of + client -> ?NS_CLIENT; + component -> ?NS_COMPONENT; + server -> ?NS_SERVER + end, + receive + #stream_start{id = ID, xmlns = XMLNS, version = Version} -> + set_opt(stream_id, ID, NewConfig) + end. + +process_stream_features(Config) -> + receive + #stream_features{sub_els = Fs} -> + Mechs = lists:flatmap( + fun(#sasl_mechanisms{list = Ms}) -> + Ms; + (_) -> + [] + end, Fs), + lists:foldl( + fun(#feature_register{}, Acc) -> + set_opt(register, true, Acc); + (#starttls{}, Acc) -> + set_opt(starttls, true, Acc); + (#compression{methods = Ms}, Acc) -> + set_opt(compression, Ms, Acc); + (_, Acc) -> + Acc + end, set_opt(mechs, Mechs, Config), Fs) + end. disconnect(Config) -> + ct:comment("Disconnecting"), Socket = ?config(socket, Config), - ok = ejabberd_socket:send(Socket, ?STREAM_TRAILER), - {xmlstreamend, <<"stream:stream">>} = recv(), + try + ok = send_text(Config, ?STREAM_TRAILER) + catch exit:normal -> + ok + end, + receive {xmlstreamend, <<"stream:stream">>} -> ok end, + flush(Config), ejabberd_socket:close(Socket), - Config. + ct:comment("Disconnected"), + set_opt(socket, undefined, Config). close_socket(Config) -> Socket = ?config(socket, Config), @@ -163,76 +247,199 @@ close_socket(Config) -> Config. starttls(Config) -> + starttls(Config, false). + +starttls(Config, ShouldFail) -> send(Config, #starttls{}), - #starttls_proceed{} = recv(), - TLSSocket = ejabberd_socket:starttls( - ?config(socket, Config), - [{certfile, ?config(certfile, Config)}, - connect]), - init_stream(set_opt(socket, TLSSocket, Config)). + receive + #starttls_proceed{} when ShouldFail -> + ct:fail(starttls_should_have_failed); + #starttls_failure{} when ShouldFail -> + Config; + #starttls_failure{} -> + ct:fail(starttls_failed); + #starttls_proceed{} -> + TLSSocket = ejabberd_socket:starttls( + ?config(socket, Config), + [{certfile, ?config(certfile, Config)}, + connect]), + set_opt(socket, TLSSocket, Config) + end. zlib(Config) -> send(Config, #compress{methods = [<<"zlib">>]}), - #compressed{} = recv(), + receive #compressed{} -> ok end, ZlibSocket = ejabberd_socket:compress(?config(socket, Config)), - init_stream(set_opt(socket, ZlibSocket, Config)). + process_stream_features(init_stream(set_opt(socket, ZlibSocket, Config))). auth(Config) -> + auth(Config, false). + +auth(Config, ShouldFail) -> + Type = ?config(type, Config), + IsAnonymous = ?config(anonymous, Config), Mechs = ?config(mechs, Config), HaveMD5 = lists:member(<<"DIGEST-MD5">>, Mechs), HavePLAIN = lists:member(<<"PLAIN">>, Mechs), - if HavePLAIN -> - auth_SASL(<<"PLAIN">>, Config); + HaveExternal = lists:member(<<"EXTERNAL">>, Mechs), + HaveAnonymous = lists:member(<<"ANONYMOUS">>, Mechs), + if HaveAnonymous and IsAnonymous -> + auth_SASL(<<"ANONYMOUS">>, Config, ShouldFail); + HavePLAIN -> + auth_SASL(<<"PLAIN">>, Config, ShouldFail); HaveMD5 -> - auth_SASL(<<"DIGEST-MD5">>, Config); + auth_SASL(<<"DIGEST-MD5">>, Config, ShouldFail); + HaveExternal andalso Type == server -> + auth_SASL(<<"EXTERNAL">>, Config, ShouldFail); + Type == client -> + auth_legacy(Config, false, ShouldFail); + Type == component -> + auth_component(Config, ShouldFail); true -> - ct:fail(no_sasl_mechanisms_available) + ct:fail(no_known_sasl_mechanism_available) end. bind(Config) -> - #iq{type = result, sub_els = [#bind{}]} = - send_recv( - Config, - #iq{type = set, - sub_els = [#bind{resource = ?config(resource, Config)}]}), - Config. + U = ?config(user, Config), + S = ?config(server, Config), + R = ?config(resource, Config), + case ?config(type, Config) of + client -> + #iq{type = result, sub_els = [#bind{jid = JID}]} = + send_recv( + Config, #iq{type = set, sub_els = [#bind{resource = R}]}), + case ?config(anonymous, Config) of + false -> + {U, S, R} = jid:tolower(JID), + Config; + true -> + {User, S, Resource} = jid:tolower(JID), + set_opt(user, User, set_opt(resource, Resource, Config)) + end; + component -> + Config + end. open_session(Config) -> - #iq{type = result, sub_els = []} = - send_recv(Config, #iq{type = set, sub_els = [#session{}]}), + open_session(Config, false). + +open_session(Config, Force) -> + if Force -> + #iq{type = result, sub_els = []} = + send_recv(Config, #iq{type = set, sub_els = [#xmpp_session{}]}); + true -> + ok + end, Config. +auth_legacy(Config, IsDigest) -> + auth_legacy(Config, IsDigest, false). + +auth_legacy(Config, IsDigest, ShouldFail) -> + ServerJID = server_jid(Config), + U = ?config(user, Config), + R = ?config(resource, Config), + P = ?config(password, Config), + #iq{type = result, + from = ServerJID, + sub_els = [#legacy_auth{username = <<"">>, + password = <<"">>, + resource = <<"">>} = Auth]} = + send_recv(Config, + #iq{to = ServerJID, type = get, + sub_els = [#legacy_auth{}]}), + Res = case Auth#legacy_auth.digest of + <<"">> when IsDigest -> + StreamID = ?config(stream_id, Config), + D = p1_sha:sha(<<StreamID/binary, P/binary>>), + send_recv(Config, #iq{to = ServerJID, type = set, + sub_els = [#legacy_auth{username = U, + resource = R, + digest = D}]}); + _ when not IsDigest -> + send_recv(Config, #iq{to = ServerJID, type = set, + sub_els = [#legacy_auth{username = U, + resource = R, + password = P}]}) + end, + case Res of + #iq{from = ServerJID, type = result, sub_els = []} -> + if ShouldFail -> + ct:fail(legacy_auth_should_have_failed); + true -> + Config + end; + #iq{from = ServerJID, type = error} -> + if ShouldFail -> + Config; + true -> + ct:fail(legacy_auth_failed) + end + end. + +auth_component(Config, ShouldFail) -> + StreamID = ?config(stream_id, Config), + Password = ?config(password, Config), + Digest = p1_sha:sha(<<StreamID/binary, Password/binary>>), + send(Config, #handshake{data = Digest}), + receive + #handshake{} when ShouldFail -> + ct:fail(component_auth_should_have_failed); + #handshake{} -> + Config; + #stream_error{reason = 'not-authorized'} when ShouldFail -> + Config; + #stream_error{reason = 'not-authorized'} -> + ct:fail(component_auth_failed) + end. + auth_SASL(Mech, Config) -> + auth_SASL(Mech, Config, false). + +auth_SASL(Mech, Config, ShouldFail) -> {Response, SASL} = sasl_new(Mech, ?config(user, Config), ?config(server, Config), ?config(password, Config)), send(Config, #sasl_auth{mechanism = Mech, text = Response}), - wait_auth_SASL_result(set_opt(sasl, SASL, Config)). + wait_auth_SASL_result(set_opt(sasl, SASL, Config), ShouldFail). -wait_auth_SASL_result(Config) -> - case recv() of +wait_auth_SASL_result(Config, ShouldFail) -> + receive + #sasl_success{} when ShouldFail -> + ct:fail(sasl_auth_should_have_failed); #sasl_success{} -> ejabberd_socket:reset_stream(?config(socket, Config)), - send_text(Config, - io_lib:format(?STREAM_HEADER, - [?config(server, Config)])), - {xmlstreamstart, <<"stream:stream">>, Attrs} = recv(), - <<"jabber:client">> = fxml:get_attr_s(<<"xmlns">>, Attrs), - <<"1.0">> = fxml:get_attr_s(<<"version">>, Attrs), - #stream_features{sub_els = Fs} = recv(), - lists:foldl( - fun(#feature_sm{}, ConfigAcc) -> - set_opt(sm, true, ConfigAcc); - (#feature_csi{}, ConfigAcc) -> - set_opt(csi, true, ConfigAcc); - (_, ConfigAcc) -> - ConfigAcc - end, Config, Fs); + send(Config, stream_header(Config)), + Type = ?config(type, Config), + NS = if Type == client -> ?NS_CLIENT; + Type == server -> ?NS_SERVER + end, + receive #stream_start{xmlns = NS, version = {1,0}} -> ok end, + receive #stream_features{sub_els = Fs} -> + if Type == client -> + #xmpp_session{optional = true} = + lists:keyfind(xmpp_session, 1, Fs); + true -> + ok + end, + lists:foldl( + fun(#feature_sm{}, ConfigAcc) -> + set_opt(sm, true, ConfigAcc); + (#feature_csi{}, ConfigAcc) -> + set_opt(csi, true, ConfigAcc); + (#rosterver_feature{}, ConfigAcc) -> + set_opt(rosterver, true, ConfigAcc); + (_, ConfigAcc) -> + ConfigAcc + end, Config, Fs) + end; #sasl_challenge{text = ClientIn} -> {Response, SASL} = (?config(sasl, Config))(ClientIn), send(Config, #sasl_response{text = Response}), - wait_auth_SASL_result(set_opt(sasl, SASL, Config)); + wait_auth_SASL_result(set_opt(sasl, SASL, Config), ShouldFail); + #sasl_failure{} when ShouldFail -> + Config; #sasl_failure{} -> ct:fail(sasl_auth_failed) end. @@ -249,28 +456,44 @@ match_failure(Received, [Match]) when is_list(Match)-> match_failure(Received, Matches) -> ct:fail("Received input:~n~n~p~n~ndon't match expected patterns:~n~n~p", [Received, Matches]). -recv() -> +recv(_Config) -> receive - {'$gen_event', {xmlstreamelement, El}} -> - Pkt = xmpp_codec:decode(fix_ns(El)), - ct:pal("recv: ~p ->~n~s", [El, xmpp_codec:pp(Pkt)]), - Pkt; - {'$gen_event', Event} -> - Event + {fail, El, Why} -> + ct:fail("recv failed: ~p->~n~s", + [El, xmpp:format_error(Why)]); + Event -> + Event + end. + +recv_iq(_Config) -> + receive #iq{} = IQ -> IQ end. + +recv_presence(_Config) -> + receive #presence{} = Pres -> Pres end. + +recv_message(_Config) -> + receive #message{} = Msg -> Msg end. + +decode_stream_element(NS, El) -> + decode(El, NS, []). + +format_element(El) -> + case erlang:function_exported(ct, log, 5) of + true -> ejabberd_web_admin:pretty_print_xml(El); + false -> io_lib:format("~p~n", [El]) end. -fix_ns(#xmlel{name = Tag, attrs = Attrs} = El) - when Tag == <<"stream:features">>; Tag == <<"stream:error">> -> - NewAttrs = [{<<"xmlns">>, <<"http://etherx.jabber.org/streams">>} - |lists:keydelete(<<"xmlns">>, 1, Attrs)], - El#xmlel{attrs = NewAttrs}; -fix_ns(#xmlel{name = Tag, attrs = Attrs} = El) - when Tag == <<"message">>; Tag == <<"iq">>; Tag == <<"presence">> -> - NewAttrs = [{<<"xmlns">>, <<"jabber:client">>} - |lists:keydelete(<<"xmlns">>, 1, Attrs)], - El#xmlel{attrs = NewAttrs}; -fix_ns(El) -> - El. +decode(El, NS, Opts) -> + try + Pkt = xmpp:decode(El, NS, Opts), + ct:pal("RECV:~n~s~n~s", + [format_element(El), xmpp:pp(Pkt)]), + Pkt + catch _:{xmpp_codec, Why} -> + ct:pal("recv failed: ~p->~n~s", + [El, xmpp:format_error(Why)]), + erlang:error({xmpp_codec, Why}) + end. send_text(Config, Text) -> ejabberd_socket:send(?config(socket, Config), Text). @@ -289,18 +512,35 @@ send(State, Pkt) -> _ -> {undefined, Pkt} end, - El = xmpp_codec:encode(NewPkt), - ct:pal("sent: ~p <-~n~s", [El, xmpp_codec:pp(NewPkt)]), - ok = send_text(State, fxml:element_to_binary(El)), + El = xmpp:encode(NewPkt), + ct:pal("SENT:~n~s~n~s", + [format_element(El), xmpp:pp(NewPkt)]), + Data = case NewPkt of + #stream_start{} -> fxml:element_to_header(El); + _ -> fxml:element_to_binary(El) + end, + ok = send_text(State, Data), NewID. -send_recv(State, IQ) -> +send_recv(State, #message{} = Msg) -> + ID = send(State, Msg), + receive #message{id = ID} = Result -> Result end; +send_recv(State, #presence{} = Pres) -> + ID = send(State, Pres), + receive #presence{id = ID} = Result -> Result end; +send_recv(State, #iq{} = IQ) -> ID = send(State, IQ), - #iq{id = ID} = recv(). + receive #iq{id = ID} = Result -> Result end. sasl_new(<<"PLAIN">>, User, Server, Password) -> {<<User/binary, $@, Server/binary, 0, User/binary, 0, Password/binary>>, fun (_) -> {error, <<"Invalid SASL challenge">>} end}; +sasl_new(<<"EXTERNAL">>, _User, _Server, _Password) -> + {<<"">>, + fun(_) -> ct:fail(sasl_challenge_is_not_expected) end}; +sasl_new(<<"ANONYMOUS">>, _User, _Server, _Password) -> + {<<"">>, + fun(_) -> ct:fail(sasl_challenge_is_not_expected) end}; sasl_new(<<"DIGEST-MD5">>, User, Server, Password) -> {<<"">>, fun (ServerIn) -> @@ -395,6 +635,20 @@ muc_room_jid(Config) -> Server = ?config(server, Config), jid:make(<<"test">>, <<"conference.", Server/binary>>, <<>>). +my_muc_jid(Config) -> + Nick = ?config(nick, Config), + RoomJID = muc_room_jid(Config), + jid:replace_resource(RoomJID, Nick). + +peer_muc_jid(Config) -> + PeerNick = ?config(peer_nick, Config), + RoomJID = muc_room_jid(Config), + jid:replace_resource(RoomJID, PeerNick). + +alt_room_jid(Config) -> + Server = ?config(server, Config), + jid:make(<<"alt">>, <<"conference.", Server/binary>>, <<>>). + mix_jid(Config) -> Server = ?config(server, Config), jid:make(<<>>, <<"mix.", Server/binary>>, <<>>). @@ -404,9 +658,9 @@ mix_room_jid(Config) -> jid:make(<<"test">>, <<"mix.", Server/binary>>, <<>>). id() -> - id(undefined). + id(<<>>). -id(undefined) -> +id(<<>>) -> randoms:get_string(); id(ID) -> ID. @@ -415,6 +669,7 @@ get_features(Config) -> get_features(Config, server_jid(Config)). get_features(Config, To) -> + ct:comment("Getting features of ~s", [jid:to_string(To)]), #iq{type = result, sub_els = [#disco_info{features = Features}]} = send_recv(Config, #iq{type = get, sub_els = [#disco_info{}], to = To}), Features. @@ -430,16 +685,82 @@ set_opt(Opt, Val, Config) -> [{Opt, Val}|lists:keydelete(Opt, 1, Config)]. wait_for_master(Config) -> - put_event(Config, slave_ready), - master_ready = get_event(Config). + put_event(Config, peer_ready), + case get_event(Config) of + peer_ready -> + ok; + Other -> + suite:match_failure(Other, peer_ready) + end. wait_for_slave(Config) -> - put_event(Config, master_ready), - slave_ready = get_event(Config). + put_event(Config, peer_ready), + case get_event(Config) of + peer_ready -> + ok; + Other -> + suite:match_failure(Other, peer_ready) + end. make_iq_result(#iq{from = From} = IQ) -> IQ#iq{type = result, to = From, from = undefined, sub_els = []}. +self_presence(Config, Type) -> + MyJID = my_jid(Config), + ct:comment("Sending self-presence"), + #presence{type = Type, from = MyJID} = + send_recv(Config, #presence{type = Type}). + +set_roster(Config, Subscription, Groups) -> + MyJID = my_jid(Config), + {U, S, _} = jid:tolower(MyJID), + PeerJID = ?config(peer, Config), + PeerBareJID = jid:remove_resource(PeerJID), + PeerLJID = jid:tolower(PeerBareJID), + ct:comment("Adding ~s to roster with subscription '~s' in groups ~p", + [jid:to_string(PeerBareJID), Subscription, Groups]), + {atomic, _} = mod_roster:set_roster(#roster{usj = {U, S, PeerLJID}, + us = {U, S}, + jid = PeerLJID, + subscription = Subscription, + groups = Groups}), + Config. + +del_roster(Config) -> + del_roster(Config, ?config(peer, Config)). + +del_roster(Config, PeerJID) -> + MyJID = my_jid(Config), + {U, S, _} = jid:tolower(MyJID), + PeerBareJID = jid:remove_resource(PeerJID), + PeerLJID = jid:tolower(PeerBareJID), + ct:comment("Removing ~s from roster", [jid:to_string(PeerBareJID)]), + {atomic, _} = mod_roster:del_roster(U, S, PeerLJID), + Config. + +get_roster(Config) -> + {LUser, LServer, _} = jid:tolower(my_jid(Config)), + mod_roster:get_roster(LUser, LServer). + +receiver(NS, Owner) -> + MRef = erlang:monitor(process, Owner), + receiver(NS, Owner, MRef). + +receiver(NS, Owner, MRef) -> + receive + {'$gen_event', {xmlstreamelement, El}} -> + Owner ! decode_stream_element(NS, El), + receiver(NS, Owner, MRef); + {'$gen_event', {xmlstreamstart, Name, Attrs}} -> + Owner ! decode(#xmlel{name = Name, attrs = Attrs}, <<>>, []), + receiver(NS, Owner, MRef); + {'$gen_event', Event} -> + Owner ! Event, + receiver(NS, Owner, MRef); + {'DOWN', MRef, process, Owner, _} -> + ok + end. + %%%=================================================================== %%% Clients puts and gets events via this relay. %%%=================================================================== @@ -456,6 +777,7 @@ event_relay() -> event_relay(Events, Subscribers) -> receive {subscribe, From} -> + erlang:monitor(process, From), From ! {ok, self()}, lists:foreach( fun(Event) -> From ! {event, Event, self()} @@ -469,7 +791,19 @@ event_relay(Events, Subscribers) -> (_) -> ok end, Subscribers), - event_relay([Event|Events], Subscribers) + event_relay([Event|Events], Subscribers); + {'DOWN', _MRef, process, Pid, _Info} -> + case lists:member(Pid, Subscribers) of + true -> + NewSubscribers = lists:delete(Pid, Subscribers), + lists:foreach( + fun(Subscriber) -> + Subscriber ! {event, peer_down, self()} + end, NewSubscribers), + event_relay(Events, NewSubscribers); + false -> + event_relay(Events, Subscribers) + end end. subscribe_to_events(Config) -> @@ -494,3 +828,12 @@ get_event(Config) -> {event, Event, Relay} -> Event end. + +flush(Config) -> + receive + {event, peer_down, _} -> flush(Config); + closed -> flush(Config); + Msg -> ct:fail({unexpected_msg, Msg}) + after 0 -> + ok + end. |