diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/ejabberd_auth_riak.erl | 7 | ||||
-rw-r--r-- | src/ejabberd_c2s.erl | 5 | ||||
-rw-r--r-- | src/ejabberd_http.erl | 289 | ||||
-rw-r--r-- | src/ejabberd_http_ws.erl | 339 | ||||
-rw-r--r-- | src/ejabberd_odbc.erl | 2 | ||||
-rw-r--r-- | src/ejabberd_riak.erl | 7 | ||||
-rw-r--r-- | src/ejabberd_riak_sup.erl | 7 | ||||
-rw-r--r-- | src/ejabberd_s2s_in.erl | 2 | ||||
-rw-r--r-- | src/ejabberd_s2s_out.erl | 2 | ||||
-rw-r--r-- | src/ejabberd_websocket.erl | 403 | ||||
-rw-r--r-- | src/eldap.erl | 4 | ||||
-rw-r--r-- | src/jlib.erl | 3 | ||||
-rw-r--r-- | src/mod_caps.erl | 7 | ||||
-rw-r--r-- | src/mod_irc_connection.erl | 4 | ||||
-rw-r--r-- | src/mod_muc_room.erl | 185 | ||||
-rw-r--r-- | src/mod_ping.erl | 2 | ||||
-rw-r--r-- | src/mod_pubsub.erl | 61 | ||||
-rw-r--r-- | src/mod_pubsub_odbc.erl | 64 |
18 files changed, 1061 insertions, 332 deletions
diff --git a/src/ejabberd_auth_riak.erl b/src/ejabberd_auth_riak.erl index 081ee6bb8..fb9be2c3e 100644 --- a/src/ejabberd_auth_riak.erl +++ b/src/ejabberd_auth_riak.erl @@ -17,10 +17,9 @@ %%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU %%% General Public License for more details. %%% -%%% You should have received a copy of the GNU General Public License -%%% along with this program; if not, write to the Free Software -%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA -%%% 02111-1307 USA +%%% You should have received a copy of the GNU General Public License along +%%% with this program; if not, write to the Free Software Foundation, Inc., +%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. %%% %%%---------------------------------------------------------------------- diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index 0855da219..7632cb121 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -2848,9 +2848,8 @@ send_stanza_and_ack_req(StateData, Stanza) -> AckReq = #xmlel{name = <<"r">>, attrs = [{<<"xmlns">>, StateData#state.mgmt_xmlns}], children = []}, - StanzaS = xml:element_to_binary(Stanza), - AckReqS = xml:element_to_binary(AckReq), - send_text(StateData, [StanzaS, AckReqS]). + send_element(StateData, Stanza), + send_element(StateData, AckReq). mgmt_queue_add(StateData, El) -> NewNum = case StateData#state.mgmt_stanzas_out of diff --git a/src/ejabberd_http.erl b/src/ejabberd_http.erl index 3c91c3c58..b624bf447 100644 --- a/src/ejabberd_http.erl +++ b/src/ejabberd_http.erl @@ -109,11 +109,6 @@ init({SockMod, Socket}, Opts) -> {p1_tls, TLSSocket}; true -> {SockMod, Socket} end, - case SockMod1 of - gen_tcp -> - inet:setopts(Socket1, [{packet, http_bin}, {recbuf, 8192}]); - _ -> ok - end, Captcha = case proplists:get_bool(captcha, Opts) of true -> [{[<<"captcha">>], ejabberd_captcha}]; false -> [] @@ -182,22 +177,10 @@ receive_headers(#state{trail = Trail} = State) -> SockMod = State#state.sockmod, Socket = State#state.socket, Data = SockMod:recv(Socket, 0, 300000), - case State#state.sockmod of - gen_tcp -> - NewState = process_header(State, Data), - case NewState#state.end_of_request of - true -> - ok; - _ -> - receive_headers(NewState) - end; - _ -> - case Data of - {ok, D} -> - parse_headers(State#state{trail = <<Trail/binary, D/binary>>}); - {error, _} -> - ok - end + case Data of + {error, _} -> ok; + {ok, D} -> + parse_headers(State#state{trail = <<Trail/binary, D/binary>>}) end. parse_headers(#state{trail = <<>>} = State) -> @@ -270,6 +253,11 @@ process_header(State, Data) -> {ok, {http_header, _, 'Host' = Name, _, Host}} -> State#state{request_host = Host, request_headers = add_header(Name, Host, State)}; + {ok, {http_header, _, Name, _, Value}} when is_binary(Name) -> + State#state{request_headers = + add_header(normalize_header_name(Name), + Value, + State)}; {ok, {http_header, _, Name, _, Value}} -> State#state{request_headers = add_header(Name, Value, State)}; @@ -294,10 +282,6 @@ process_header(State, Data) -> send_text(State2, Out), case State2#state.request_keepalive of true -> - case SockMod of - gen_tcp -> inet:setopts(Socket, [{packet, http_bin}]); - _ -> ok - end, #state{sockmod = SockMod, socket = Socket, options = State#state.options, request_handlers = State#state.request_handlers}; @@ -345,48 +329,93 @@ get_transfer_protocol(SockMod, HostPort) -> %% XXX bard: search through request handlers looking for one that %% matches the requested URL path, and pass control to it. If none is %% found, answer with HTTP 404. -process([], _) -> - ejabberd_web:error(not_found); -process(Handlers, Request) -> - %% Only the first element in the path prefix is checked - [{HandlerPathPrefix, HandlerModule} | HandlersLeft] = - Handlers, - case lists:prefix(HandlerPathPrefix, - Request#request.path) - or (HandlerPathPrefix == Request#request.path) - of - true -> - ?DEBUG("~p matches ~p", - [Request#request.path, HandlerPathPrefix]), - LocalPath = lists:nthtail(length(HandlerPathPrefix), - Request#request.path), - ?DEBUG("~p", [Request#request.headers]), - R = HandlerModule:process(LocalPath, Request), - ejabberd_hooks:run(http_request_debug, - [{LocalPath, Request}]), - R; - false -> process(HandlersLeft, Request) + +process([], _, _, _, _) -> ejabberd_web:error(not_found); +process(Handlers, Request, Socket, SockMod, Trail) -> + {HandlerPathPrefix, HandlerModule, HandlerOpts, HandlersLeft} = + case Handlers of + [{Pfx, Mod} | Tail] -> + {Pfx, Mod, [], Tail}; + [{Pfx, Mod, Opts} | Tail] -> + {Pfx, Mod, Opts, Tail} + end, + + case (lists:prefix(HandlerPathPrefix, Request#request.path) or + (HandlerPathPrefix==Request#request.path)) of + true -> + ?DEBUG("~p matches ~p", [Request#request.path, HandlerPathPrefix]), + %% LocalPath is the path "local to the handler", i.e. if + %% the handler was registered to handle "/test/" and the + %% requested path is "/test/foo/bar", the local path is + %% ["foo", "bar"] + LocalPath = lists:nthtail(length(HandlerPathPrefix), Request#request.path), + R = try + HandlerModule:socket_handoff( + LocalPath, Request, Socket, SockMod, Trail, HandlerOpts) + catch error:undef -> + HandlerModule:process(LocalPath, Request) + end, + ejabberd_hooks:run(http_request_debug, [{LocalPath, Request}]), + R; + false -> + process(HandlersLeft, Request, Socket, SockMod, Trail) end. -process_request(#state{request_method = Method, options = Options, - request_path = {abs_path, Path}, request_auth = Auth, - request_lang = Lang, request_handlers = RequestHandlers, - request_host = Host, request_port = Port, - request_tp = TP, request_headers = RequestHeaders, - sockmod = SockMod, - socket = Socket} = State) - when Method=:='GET' orelse Method=:='HEAD' orelse Method=:='DELETE' orelse Method=:='OPTIONS' -> - case (catch url_decode_q_split(Path)) of - {'EXIT', _} -> +extract_path_query(#state{request_method = Method, + request_path = {abs_path, Path}}) + when Method =:= 'GET' orelse + Method =:= 'HEAD' orelse + Method =:= 'DELETE' orelse Method =:= 'OPTIONS' -> + case catch url_decode_q_split(Path) of + {'EXIT', _} -> false; + {NPath, Query} -> + LPath = normalize_path([NPE + || NPE <- str:tokens(path_decode(NPath), <<"/">>)]), + LQuery = case catch parse_urlencoded(Query) of + {'EXIT', _Reason} -> []; + LQ -> LQ + end, + {LPath, LQuery, <<"">>} + end; +extract_path_query(#state{request_method = Method, + request_path = {abs_path, Path}, + request_content_length = Len, + sockmod = _SockMod, + socket = _Socket} = State) + when (Method =:= 'POST' orelse Method =:= 'PUT') andalso + is_integer(Len) -> + Data = recv_data(State, Len), + ?DEBUG("client data: ~p~n", [Data]), + case catch url_decode_q_split(Path) of + {'EXIT', _} -> false; + {NPath, _Query} -> + LPath = normalize_path([NPE + || NPE <- str:tokens(path_decode(NPath), <<"/">>)]), + LQuery = case catch parse_urlencoded(Data) of + {'EXIT', _Reason} -> []; + LQ -> LQ + end, + {LPath, LQuery, Data} + end; +extract_path_query(_State) -> + false. + +process_request(#state{request_method = Method, + request_auth = Auth, + request_lang = Lang, + sockmod = SockMod, + socket = Socket, + options = Options, + request_host = Host, + request_port = Port, + request_tp = TP, + request_headers = RequestHeaders, + request_handlers = RequestHandlers, + trail = Trail} = State) -> + case extract_path_query(State) of + false -> make_bad_request(State); - {NPath, Query} -> - LPath = normalize_path([NPE || NPE <- str:tokens(path_decode(NPath), <<"/">>)]), - LQuery = case (catch parse_urlencoded(Query)) of - {'EXIT', _Reason} -> - []; - LQ -> - LQ - end, + {LPath, LQuery, Data} -> {ok, IPHere} = case SockMod of gen_tcp -> @@ -396,92 +425,36 @@ process_request(#state{request_method = Method, options = Options, end, XFF = proplists:get_value('X-Forwarded-For', RequestHeaders, []), IP = analyze_ip_xff(IPHere, XFF, Host), - Request = #request{method = Method, - path = LPath, - opts = Options, - q = LQuery, - auth = Auth, - lang = Lang, - host = Host, - port = Port, - tp = TP, - headers = RequestHeaders, - ip = IP}, - %% XXX bard: This previously passed control to - %% ejabberd_web:process_get, now passes it to a local - %% procedure (process) that handles dispatching based on - %% URL path prefix. - case process(RequestHandlers, Request) of - El when element(1, El) == xmlel -> - make_xhtml_output(State, 200, [], El); - {Status, Headers, El} when - element(1, El) == xmlel -> - make_xhtml_output(State, Status, Headers, El); - Output when is_list(Output) or is_binary(Output) -> - make_text_output(State, 200, [], Output); - {Status, Headers, Output} when is_list(Output) or is_binary(Output) -> - make_text_output(State, Status, Headers, Output) - end - end; -process_request(#state{request_method = Method, options = Options, - request_path = {abs_path, Path}, request_auth = Auth, - request_content_length = Len, request_lang = Lang, - sockmod = SockMod, socket = Socket, request_host = Host, - request_port = Port, request_tp = TP, - request_headers = RequestHeaders, - request_handlers = RequestHandlers} = - State) - when (Method =:= 'POST' orelse Method =:= 'PUT') andalso - is_integer(Len) -> - {ok, IPHere} = case SockMod of - gen_tcp -> inet:peername(Socket); - _ -> SockMod:peername(Socket) - end, - XFF = proplists:get_value('X-Forwarded-For', - RequestHeaders, []), - IP = analyze_ip_xff(IPHere, XFF, Host), - case SockMod of - gen_tcp -> inet:setopts(Socket, [{packet, 0}]); - _ -> ok - end, - Data = recv_data(State, Len), - ?DEBUG("client data: ~p~n", [Data]), - case (catch url_decode_q_split(Path)) of - {'EXIT', _} -> - make_bad_request(State); - {NPath, _Query} -> - LPath = normalize_path([NPE || NPE <- str:tokens(path_decode(NPath), <<"/">>)]), - LQuery = case (catch parse_urlencoded(Data)) of - {'EXIT', _Reason} -> - []; - LQ -> - LQ - end, - Request = #request{method = Method, - path = LPath, - q = LQuery, + Request = #request{method = Method, + path = LPath, + q = LQuery, + auth = Auth, + data = Data, + lang = Lang, + host = Host, + port = Port, + tp = TP, opts = Options, - auth = Auth, - data = Data, - lang = Lang, - host = Host, - port = Port, - tp = TP, - headers = RequestHeaders, - ip = IP}, - case process(RequestHandlers, Request) of - El when element(1, El) == xmlel -> - make_xhtml_output(State, 200, [], El); - {Status, Headers, El} when - element(1, El) == xmlel -> - make_xhtml_output(State, Status, Headers, El); - Output when is_list(Output) or is_binary(Output) -> - make_text_output(State, 200, [], Output); - {Status, Headers, Output} when is_list(Output) or is_binary(Output) -> - make_text_output(State, Status, Headers, Output) + headers = RequestHeaders, + ip = IP}, + case process(RequestHandlers, Request, Socket, SockMod, Trail) of + El when is_record(El, xmlel) -> + make_xhtml_output(State, 200, [], El); + {Status, Headers, El} + when is_record(El, xmlel) -> + make_xhtml_output(State, Status, Headers, El); + Output when is_binary(Output) or is_list(Output) -> + make_text_output(State, 200, [], Output); + {Status, Headers, Output} + when is_binary(Output) or is_list(Output) -> + make_text_output(State, Status, Headers, Output); + {Status, Reason, Headers, Output} + when is_binary(Output) or is_list(Output) -> + make_text_output(State, Status, Reason, Headers, Output); + _ -> + none end - end; -process_request(State) -> make_bad_request(State). + end. make_bad_request(State) -> %% Support for X-Forwarded-From @@ -836,6 +809,26 @@ old_integer_to_hex(I) when I >= 16 -> N = trunc(I / 16), old_integer_to_hex(N) ++ old_integer_to_hex(I rem 16). +% The following code is mostly taken from yaws_ssl.erl + +toupper(C) when C >= $a andalso C =< $z -> C - 32; +toupper(C) -> C. + +tolower(C) when C >= $A andalso C =< $Z -> C + 32; +tolower(C) -> C. + +normalize_header_name(Name) -> + normalize_header_name(Name, [], true). + +normalize_header_name(<<"">>, Acc, _) -> + iolist_to_binary(Acc); +normalize_header_name(<<"-", Rest/binary>>, Acc, _) -> + normalize_header_name(Rest, [Acc, "-"], true); +normalize_header_name(<<C:8, Rest/binary>>, Acc, true) -> + normalize_header_name(Rest, [Acc, toupper(C)], false); +normalize_header_name(<<C:8, Rest/binary>>, Acc, false) -> + normalize_header_name(Rest, [Acc, tolower(C)], false). + normalize_path(Path) -> normalize_path(Path, []). diff --git a/src/ejabberd_http_ws.erl b/src/ejabberd_http_ws.erl new file mode 100644 index 000000000..e64212b86 --- /dev/null +++ b/src/ejabberd_http_ws.erl @@ -0,0 +1,339 @@ +%%%---------------------------------------------------------------------- +%%% File : ejabberd_websocket.erl +%%% Author : Eric Cestari <ecestari@process-one.net> +%%% Purpose : XMPP Websocket support +%%% Created : 09-10-2010 by Eric Cestari <ecestari@process-one.net> +%%% +%%% +%%% ejabberd, Copyright (C) 2002-2015 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License along +%%% with this program; if not, write to the Free Software Foundation, Inc., +%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +%%% +%%%---------------------------------------------------------------------- +-module(ejabberd_http_ws). + +-author('ecestari@process-one.net'). + +-behaviour(gen_fsm). + +% External exports +-export([start/1, start_link/1, init/1, handle_event/3, + handle_sync_event/4, code_change/4, handle_info/3, + terminate/3, send_xml/2, setopts/2, sockname/1, peername/1, + controlling_process/2, become_controller/2, close/1, + socket_handoff/6]). + +-include("ejabberd.hrl"). +-include("logger.hrl"). + +-include("jlib.hrl"). + +-include("ejabberd_http.hrl"). + +-define(PING_INTERVAL, 60). +-define(WEBSOCKET_TIMEOUT, 300). + +-record(state, + {socket :: ws_socket(), + ping_interval = ?PING_INTERVAL :: pos_integer(), + ping_timer = make_ref() :: reference(), + pong_expected :: boolean(), + timeout = ?WEBSOCKET_TIMEOUT :: pos_integer(), + timer = make_ref() :: reference(), + input = [] :: list(), + waiting_input = false :: false | pid(), + last_receiver :: pid(), + ws :: {#ws{}, pid()}, + rfc_compilant = undefined :: boolean() | undefined}). + +%-define(DBGFSM, true). + +-ifdef(DBGFSM). + +-define(FSMOPTS, [{debug, [trace]}]). + +-else. + +-define(FSMOPTS, []). + +-endif. + +-type ws_socket() :: {http_ws, pid(), {inet:ip_address(), inet:port_number()}}. +-export_type([ws_socket/0]). + +start(WS) -> + supervisor:start_child(ejabberd_wsloop_sup, [WS]). + +start_link(WS) -> + gen_fsm:start_link(?MODULE, [WS], ?FSMOPTS). + +send_xml({http_ws, FsmRef, _IP}, Packet) -> + gen_fsm:sync_send_all_state_event(FsmRef, + {send_xml, Packet}). + +setopts({http_ws, FsmRef, _IP}, Opts) -> + case lists:member({active, once}, Opts) of + true -> + gen_fsm:send_all_state_event(FsmRef, + {activate, self()}); + _ -> ok + end. + +sockname(_Socket) -> {ok, {{0, 0, 0, 0}, 0}}. + +peername({http_ws, _FsmRef, IP}) -> {ok, IP}. + +controlling_process(_Socket, _Pid) -> ok. + +become_controller(FsmRef, C2SPid) -> + gen_fsm:send_all_state_event(FsmRef, + {become_controller, C2SPid}). + +close({http_ws, FsmRef, _IP}) -> + catch gen_fsm:sync_send_all_state_event(FsmRef, close). + +socket_handoff(LocalPath, Request, Socket, SockMod, Buf, Opts) -> + ejabberd_websocket:socket_handoff(LocalPath, Request, Socket, SockMod, + Buf, Opts, ?MODULE, fun get_human_html_xmlel/0). + +%%% Internal + +init([{#ws{ip = IP}, _} = WS]) -> + Opts = [{xml_socket, true} | ejabberd_c2s_config:get_c2s_limits()], + PingInterval = ejabberd_config:get_option( + {websocket_ping_interval, ?MYNAME}, + fun(I) when is_integer(I), I>=0 -> I end, + ?PING_INTERVAL) * 1000, + WSTimeout = ejabberd_config:get_option( + {websocket_timeout, ?MYNAME}, + fun(I) when is_integer(I), I>0 -> I end, + ?WEBSOCKET_TIMEOUT) * 1000, + Socket = {http_ws, self(), IP}, + ?DEBUG("Client connected through websocket ~p", + [Socket]), + ejabberd_socket:start(ejabberd_c2s, ?MODULE, Socket, + Opts), + Timer = erlang:start_timer(WSTimeout, self(), []), + {ok, loop, + #state{socket = Socket, timeout = WSTimeout, + timer = Timer, ws = WS, + ping_interval = PingInterval}}. + +handle_event({activate, From}, StateName, StateData) -> + case StateData#state.input of + [] -> + {next_state, StateName, + StateData#state{waiting_input = From}}; + Input -> + Receiver = From, + Receiver ! {tcp, StateData#state.socket, Input}, + {next_state, StateName, + StateData#state{input = [], waiting_input = false, + last_receiver = Receiver}} + end. + +handle_sync_event({send_xml, Packet}, _From, StateName, + #state{ws = {_, WsPid}, rfc_compilant = R} = StateData) -> + Packet2 = case {case R of undefined -> true; V -> V end, Packet} of + {true, {xmlstreamstart, _, Attrs}} -> + Attrs2 = [{<<"xmlns">>, <<"urn:ietf:params:xml:ns:xmpp-framing">>} | + lists:keydelete(<<"xmlns">>, 1, lists:keydelete(<<"xmlns:stream">>, 1, Attrs))], + {xmlstreamelement, #xmlel{name = <<"open">>, attrs = Attrs2}}; + {true, {xmlstreamend, _}} -> + {xmlstreamelement, #xmlel{name = <<"close">>, + attrs = [{<<"xmlns">>, <<"urn:ietf:params:xml:ns:xmpp-framing">>}]}}; + {true, {xmlstreamraw, <<"\r\n\r\n">>}} -> % cdata ping + skip; + {true, {xmlstreamelement, #xmlel{name=Name2} = El2}} -> + El3 = case Name2 of + <<"stream:", _/binary>> -> + xml:replace_tag_attr(<<"xmlns:stream">>, ?NS_STREAM, El2); + _ -> + case xml:get_tag_attr_s(<<"xmlns">>, El2) of + <<"">> -> + xml:replace_tag_attr(<<"xmlns">>, <<"jabber:client">>, El2); + _ -> + El2 + end + end, + {xmlstreamelement , El3}; + _ -> + Packet + end, + case Packet2 of + {xmlstreamstart, Name, Attrs3} -> + B = xml:element_to_binary(#xmlel{name = Name, attrs = Attrs3}), + WsPid ! {send, <<(binary:part(B, 0, byte_size(B)-2))/binary, ">">>}; + {xmlstreamend, Name} -> + WsPid ! {send, <<"</", Name/binary, ">">>}; + {xmlstreamelement, El} -> + WsPid ! {send, xml:element_to_binary(El)}; + {xmlstreamraw, Bin} -> + WsPid ! {send, Bin}; + {xmlstreamcdata, Bin2} -> + WsPid ! {send, Bin2}; + skip -> + ok + end, + {reply, ok, StateName, StateData}; +handle_sync_event(close, _From, _StateName, StateData) -> + {stop, normal, StateData}. + +handle_info(closed, _StateName, StateData) -> + {stop, normal, StateData}; +handle_info({received, Packet}, StateName, StateDataI) -> + {StateData, Parsed} = parse(StateDataI, Packet), + SD = case StateData#state.waiting_input of + false -> + Input = StateData#state.input ++ Parsed, + StateData#state{input = Input}; + Receiver -> + Receiver ! {tcp, StateData#state.socket, Parsed}, + setup_timers(StateData#state{waiting_input = false, + last_receiver = Receiver}) + end, + {next_state, StateName, SD}; +handle_info(PingPong, StateName, StateData) when PingPong == ping orelse + PingPong == pong -> + StateData2 = setup_timers(StateData), + {next_state, StateName, + StateData2#state{pong_expected = false}}; +handle_info({timeout, Timer, _}, _StateName, + #state{timer = Timer} = StateData) -> + {stop, normal, StateData}; +handle_info({timeout, Timer, _}, StateName, + #state{ping_timer = Timer, ws = {_, WsPid}} = StateData) -> + case StateData#state.pong_expected of + false -> + cancel_timer(StateData#state.ping_timer), + PingTimer = erlang:start_timer(StateData#state.ping_interval, + self(), []), + WsPid ! {ping, <<>>}, + {next_state, StateName, + StateData#state{ping_timer = PingTimer, pong_expected = true}}; + true -> + {stop, normal, StateData} + end; +handle_info(_, StateName, StateData) -> + {next_state, StateName, StateData}. + +code_change(_OldVsn, StateName, StateData, _Extra) -> + {ok, StateName, StateData}. + +terminate(_Reason, _StateName, StateData) -> + case StateData#state.waiting_input of + false -> ok; + Receiver -> + ?DEBUG("C2S Pid : ~p", [Receiver]), + Receiver ! {tcp_closed, StateData#state.socket} + end, + ok. + +setup_timers(StateData) -> + cancel_timer(StateData#state.timer), + Timer = erlang:start_timer(StateData#state.timeout, + self(), []), + cancel_timer(StateData#state.ping_timer), + PingTimer = case {StateData#state.ping_interval, StateData#state.rfc_compilant} of + {0, _} -> StateData#state.ping_timer; + {_, false} -> StateData#state.ping_timer; + {V, _} -> erlang:start_timer(V, self(), []) + end, + StateData#state{timer = Timer, ping_timer = PingTimer, + pong_expected = false}. + +cancel_timer(Timer) -> + erlang:cancel_timer(Timer), + receive {timeout, Timer, _} -> ok after 0 -> ok end. + +get_human_html_xmlel() -> + Heading = <<"ejabberd ", (jlib:atom_to_binary(?MODULE))/binary>>, + #xmlel{name = <<"html">>, + attrs = + [{<<"xmlns">>, <<"http://www.w3.org/1999/xhtml">>}], + children = + [#xmlel{name = <<"head">>, attrs = [], + children = + [#xmlel{name = <<"title">>, attrs = [], + children = [{xmlcdata, Heading}]}]}, + #xmlel{name = <<"body">>, attrs = [], + children = + [#xmlel{name = <<"h1">>, attrs = [], + children = [{xmlcdata, Heading}]}, + #xmlel{name = <<"p">>, attrs = [], + children = + [{xmlcdata, <<"An implementation of ">>}, + #xmlel{name = <<"a">>, + attrs = + [{<<"href">>, + <<"http://tools.ietf.org/html/rfc6455">>}], + children = + [{xmlcdata, + <<"WebSocket protocol">>}]}]}, + #xmlel{name = <<"p">>, attrs = [], + children = + [{xmlcdata, + <<"This web page is only informative. To " + "use WebSocket connection you need a Jabber/XMPP " + "client that supports it.">>}]}]}]}. + + +parse(#state{rfc_compilant = C} = State, Data) -> + case C of + undefined -> + P = xml_stream:new(self()), + P2 = xml_stream:parse(P, Data), + xml_stream:close(P2), + case parsed_items([]) of + error -> + {State#state{rfc_compilant = true}, <<"parse error">>}; + [] -> + {State#state{rfc_compilant = true}, <<"parse error">>}; + [{xmlstreamstart, <<"open">>, _} | _] -> + parse(State#state{rfc_compilant = true}, Data); + _ -> + parse(State#state{rfc_compilant = false}, Data) + end; + true -> + El = xml_stream:parse_element(Data), + case El of + #xmlel{name = <<"open">>, attrs = Attrs} -> + Attrs2 = [{<<"xmlns:stream">>, ?NS_STREAM}, {<<"xmlns">>, <<"jabber:client">>} | + lists:keydelete(<<"xmlns">>, 1, lists:keydelete(<<"xmlns:stream">>, 1, Attrs))], + {State, [{xmlstreamstart, <<"stream:stream">>, Attrs2}]}; + #xmlel{name = <<"close">>} -> + {State, [{xmlstreamend, <<"stream:stream">>}]}; + {error, _} -> + {State, <<"parse error">>}; + _ -> + {State, [El]} + end; + false -> + {State, Data} + end. + +parsed_items(List) -> + receive + {'$gen_event', El} + when element(1, El) == xmlel; + element(1, El) == xmlstreamstart; + element(1, El) == xmlstreamelement; + element(1, El) == xmlstreamend -> + parsed_items([El | List]); + {'$gen_event', {xmlstreamerror, _}} -> + error + after 0 -> + lists:reverse(List) + end. diff --git a/src/ejabberd_odbc.erl b/src/ejabberd_odbc.erl index 9cf30f53e..5828912d5 100644 --- a/src/ejabberd_odbc.erl +++ b/src/ejabberd_odbc.erl @@ -62,7 +62,7 @@ start_interval = 0 :: non_neg_integer(), host = <<"">> :: binary(), max_pending_requests_len :: non_neg_integer(), - pending_requests = {0, queue:new()} :: {non_neg_integer(), queue()}}). + pending_requests = {0, queue:new()} :: {non_neg_integer(), ?TQUEUE}}). -define(STATE_KEY, ejabberd_odbc_state). diff --git a/src/ejabberd_riak.erl b/src/ejabberd_riak.erl index f677ca91a..c8084674f 100644 --- a/src/ejabberd_riak.erl +++ b/src/ejabberd_riak.erl @@ -16,10 +16,9 @@ %%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU %%% General Public License for more details. %%% -%%% You should have received a copy of the GNU General Public License -%%% along with this program; if not, write to the Free Software -%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA -%%% 02111-1307 USA +%%% You should have received a copy of the GNU General Public License along +%%% with this program; if not, write to the Free Software Foundation, Inc., +%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. %%% %%%------------------------------------------------------------------- -module(ejabberd_riak). diff --git a/src/ejabberd_riak_sup.erl b/src/ejabberd_riak_sup.erl index 9711e6652..871af5a06 100644 --- a/src/ejabberd_riak_sup.erl +++ b/src/ejabberd_riak_sup.erl @@ -17,10 +17,9 @@ %%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU %%% General Public License for more details. %%% -%%% You should have received a copy of the GNU General Public License -%%% along with this program; if not, write to the Free Software -%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA -%%% 02111-1307 USA +%%% You should have received a copy of the GNU General Public License along +%%% with this program; if not, write to the Free Software Foundation, Inc., +%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. %%% %%%---------------------------------------------------------------------- diff --git a/src/ejabberd_s2s_in.erl b/src/ejabberd_s2s_in.erl index 7afac4715..1b40f03c2 100644 --- a/src/ejabberd_s2s_in.erl +++ b/src/ejabberd_s2s_in.erl @@ -58,7 +58,7 @@ server = <<"">> :: binary(), authenticated = false :: boolean(), auth_domain = <<"">> :: binary(), - connections = (?DICT):new() :: dict(), + connections = (?DICT):new() :: ?TDICT, timer = make_ref() :: reference()}). %-define(DBGFSM, true). diff --git a/src/ejabberd_s2s_out.erl b/src/ejabberd_s2s_out.erl index 3445023ed..97164326d 100644 --- a/src/ejabberd_s2s_out.erl +++ b/src/ejabberd_s2s_out.erl @@ -77,7 +77,7 @@ try_auth = true :: boolean(), myname = <<"">> :: binary(), server = <<"">> :: binary(), - queue = queue:new() :: queue(), + queue = queue:new() :: ?TQUEUE, delay_to_retry = undefined_delay :: undefined_delay | non_neg_integer(), new = false :: false | binary(), verify = false :: false | {pid(), binary(), binary()}, diff --git a/src/ejabberd_websocket.erl b/src/ejabberd_websocket.erl new file mode 100644 index 000000000..8cd1b2289 --- /dev/null +++ b/src/ejabberd_websocket.erl @@ -0,0 +1,403 @@ +%%%---------------------------------------------------------------------- +%%% File : ejabberd_websocket.erl +%%% Author : Eric Cestari <ecestari@process-one.net> +%%% Purpose : XMPP Websocket support +%%% Created : 09-10-2010 by Eric Cestari <ecestari@process-one.net> +%%% +%%% Some code lifted from MISULTIN - WebSocket misultin_websocket.erl - >-|-|-(°> +%%% (http://github.com/ostinelli/misultin/blob/master/src/misultin_websocket.erl) +%%% Copyright (C) 2010, Roberto Ostinelli <roberto@ostinelli.net>, Joe Armstrong. +%%% All rights reserved. +%%% +%%% Code portions from Joe Armstrong have been originally taken under MIT license at the address: +%%% <http://armstrongonsoftware.blogspot.com/2009/12/comet-is-dead-long-live-websockets.html> +%%% +%%% BSD License +%%% +%%% Redistribution and use in source and binary forms, with or without modification, are permitted provided +%%% that the following conditions are met: +%%% +%%% * Redistributions of source code must retain the above copyright notice, this list of conditions and the +%%% following disclaimer. +%%% * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and +%%% the following disclaimer in the documentation and/or other materials provided with the distribution. +%%% * Neither the name of the authors nor the names of its contributors may be used to endorse or promote +%%% products derived from this software without specific prior written permission. +%%% +%%% THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED +%%% WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A +%%% PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR +%%% ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED +%%% TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +%%% HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +%%% NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +%%% POSSIBILITY OF SUCH DAMAGE. +%%% ========================================================================================================== +%%% ejabberd, Copyright (C) 2002-2015 ProcessOne +%%%---------------------------------------------------------------------- + +-module(ejabberd_websocket). + +-author('ecestari@process-one.net'). + +-export([check/2, socket_handoff/8]). + +-include("ejabberd.hrl"). +-include("logger.hrl"). + +-include("jlib.hrl"). + +-include("ejabberd_http.hrl"). + +-define(CT_XML, {<<"Content-Type">>, <<"text/xml; charset=utf-8">>}). +-define(CT_PLAIN, {<<"Content-Type">>, <<"text/plain">>}). + +-define(AC_ALLOW_ORIGIN, {<<"Access-Control-Allow-Origin">>, <<"*">>}). +-define(AC_ALLOW_METHODS, {<<"Access-Control-Allow-Methods">>, <<"GET, OPTIONS">>}). +-define(AC_ALLOW_HEADERS, {<<"Access-Control-Allow-Headers">>, <<"Content-Type">>}). +-define(AC_MAX_AGE, {<<"Access-Control-Max-Age">>, <<"86400">>}). + +-define(OPTIONS_HEADER, [?CT_PLAIN, ?AC_ALLOW_ORIGIN, ?AC_ALLOW_METHODS, + ?AC_ALLOW_HEADERS, ?AC_MAX_AGE]). +-define(HEADER, [?CT_XML, ?AC_ALLOW_ORIGIN, ?AC_ALLOW_HEADERS]). + +check(_Path, Headers) -> + RequiredHeaders = [{'Upgrade', <<"websocket">>}, + {'Connection', ignore}, {'Host', ignore}, + {<<"Sec-Websocket-Key">>, ignore}, + {<<"Sec-Websocket-Version">>, <<"13">>}], + + F = fun ({Tag, Val}) -> + case lists:keyfind(Tag, 1, Headers) of + false -> true; % header not found, keep in list + {_, HVal} -> + case Val of + ignore -> false; % ignore value -> ok, remove from list + HVal -> false; % expected val -> ok, remove from list + _ -> + true % val is different, keep in list + end + end + end, + case lists:filter(F, RequiredHeaders) of + [] -> true; + _MissingHeaders -> false + end. + +socket_handoff(LocalPath, #request{method = 'GET', ip = IP, q = Q, path = Path, + headers = Headers, host = Host, port = Port}, + Socket, SockMod, Buf, _Opts, HandlerModule, InfoMsgFun) -> + case check(LocalPath, Headers) of + true -> + WS = #ws{socket = Socket, + sockmod = SockMod, + ip = IP, + q = Q, + host = Host, + port = Port, + path = Path, + headers = Headers, + local_path = LocalPath, + buf = Buf}, + + connect(WS, HandlerModule); + _ -> + {200, ?HEADER, InfoMsgFun()} + end; +socket_handoff(_, #request{method = 'OPTIONS'}, _, _, _, _, _, _) -> + {200, ?OPTIONS_HEADER, []}; +socket_handoff(_, #request{method = 'HEAD'}, _, _, _, _, _, _) -> + {200, ?HEADER, []}; +socket_handoff(_, _, _, _, _, _, _, _) -> + {400, ?HEADER, #xmlel{name = <<"h1">>, + children = [{xmlcdata, <<"400 Bad Request">>}]}}. + +connect(#ws{socket = Socket, sockmod = SockMod} = Ws, WsLoop) -> + {NewWs, HandshakeResponse} = handshake(Ws), + SockMod:send(Socket, HandshakeResponse), + + ?DEBUG("Sent handshake response : ~p", + [HandshakeResponse]), + Ws0 = {Ws, self()}, + {ok, WsHandleLoopPid} = WsLoop:start_link(Ws0), + erlang:monitor(process, WsHandleLoopPid), + + case NewWs#ws.buf of + <<>> -> + ok; + Data -> + self() ! {raw, Socket, Data} + end, + + % set opts + case SockMod of + gen_tcp -> + inet:setopts(Socket, [{packet, 0}, {active, true}]); + _ -> + SockMod:setopts(Socket, [{packet, 0}, {active, true}]) + end, + ws_loop(none, Socket, WsHandleLoopPid, SockMod). + +handshake(#ws{headers = Headers} = State) -> + {_, Key} = lists:keyfind(<<"Sec-Websocket-Key">>, 1, + Headers), + SubProtocolHeader = case find_subprotocol(Headers) of + false -> + []; + V -> + [<<"Sec-Websocket-Protocol:">>, V, <<"\r\n">>] + end, + Hash = jlib:encode_base64( + p1_sha:sha1(<<Key/binary, "258EAFA5-E914-47DA-95CA-C5AB0DC85B11">>)), + {State, [<<"HTTP/1.1 101 Switching Protocols\r\n">>, + <<"Upgrade: websocket\r\n">>, + <<"Connection: Upgrade\r\n">>, + SubProtocolHeader, + <<"Sec-WebSocket-Accept: ">>, Hash, <<"\r\n\r\n">>]}. + +find_subprotocol(Headers) -> + case lists:keysearch(<<"Sec-Websocket-Protocol">>, 1, Headers) of + false -> + case lists:keysearch(<<"Websocket-Protocol">>, 1, Headers) of + false -> + false; + {value, {_, Protocol2}} -> + Protocol2 + end; + {value, {_, Protocol}} -> + Protocol + end. + + +ws_loop(FrameInfo, Socket, WsHandleLoopPid, SocketMode) -> + receive + {DataType, _Socket, Data} when DataType =:= tcp orelse DataType =:= raw -> + case handle_data(DataType, FrameInfo, Data, Socket, WsHandleLoopPid, SocketMode) of + {error, Error} -> + ?DEBUG("tls decode error ~p", [Error]), + websocket_close(Socket, WsHandleLoopPid, SocketMode, 1002); % protocol error + {NewFrameInfo, ToSend} -> + lists:foreach(fun(Pkt) -> SocketMode:send(Socket, Pkt) + end, ToSend), + ws_loop(NewFrameInfo, Socket, WsHandleLoopPid, SocketMode) + end; + {tcp_closed, _Socket} -> + ?DEBUG("tcp connection was closed, exit", []), + websocket_close(Socket, WsHandleLoopPid, SocketMode, 0); + {'DOWN', Ref, process, WsHandleLoopPid, Reason} -> + Code = case Reason of + normal -> + 1000; % normal close + _ -> + ?ERROR_MSG("linked websocket controlling loop crashed " + "with reason: ~p", + [Reason]), + 1011 % internal error + end, + erlang:demonitor(Ref), + websocket_close(Socket, WsHandleLoopPid, SocketMode, Code); + {send, Data} -> + SocketMode:send(Socket, encode_frame(Data, 1)), + ws_loop(FrameInfo, Socket, WsHandleLoopPid, + SocketMode); + {ping, Data} -> + SocketMode:send(Socket, encode_frame(Data, 9)), + ws_loop(FrameInfo, Socket, WsHandleLoopPid, + SocketMode); + shutdown -> + ?DEBUG("shutdown request received, closing websocket " + "with pid ~p", + [self()]), + websocket_close(Socket, WsHandleLoopPid, SocketMode, 1001); % going away + _Ignored -> + ?WARNING_MSG("received unexpected message, ignoring: ~p", + [_Ignored]), + ws_loop(FrameInfo, Socket, WsHandleLoopPid, + SocketMode) + end. + +encode_frame(Data, Opcode) -> + case byte_size(Data) of + S1 when S1 < 126 -> + <<1:1, 0:3, Opcode:4, 0:1, S1:7, Data/binary>>; + S2 when S2 < 65536 -> + <<1:1, 0:3, Opcode:4, 0:1, 126:7, S2:16, Data/binary>>; + S3 -> + <<1:1, 0:3, Opcode:4, 0:1, 127:7, S3:64, Data/binary>> + end. + +-record(frame_info, + {mask = none, offset = 0, left, final_frame = true, + opcode, unprocessed = <<>>, unmasked = <<>>, + unmasked_msg = <<>>}). + +decode_header(<<Final:1, _:3, Opcode:4, 0:1, + Len:7, Data/binary>>) + when Len < 126 -> + {Len, Final, Opcode, none, Data}; +decode_header(<<Final:1, _:3, Opcode:4, 0:1, + 126:7, Len:16/integer, Data/binary>>) -> + {Len, Final, Opcode, none, Data}; +decode_header(<<Final:1, _:3, Opcode:4, 0:1, + 127:7, Len:64/integer, Data/binary>>) -> + {Len, Final, Opcode, none, Data}; +decode_header(<<Final:1, _:3, Opcode:4, 1:1, + Len:7, Mask:4/binary, Data/binary>>) + when Len < 126 -> + {Len, Final, Opcode, Mask, Data}; +decode_header(<<Final:1, _:3, Opcode:4, 1:1, + 126:7, Len:16/integer, Mask:4/binary, Data/binary>>) -> + {Len, Final, Opcode, Mask, Data}; +decode_header(<<Final:1, _:3, Opcode:4, 1:1, + 127:7, Len:64/integer, Mask:4/binary, Data/binary>>) -> + {Len, Final, Opcode, Mask, Data}; +decode_header(_) -> none. + +unmask_int(Offset, _, <<>>, Acc) -> + {Acc, Offset}; +unmask_int(0, <<M:32>> = Mask, + <<N:32, Rest/binary>>, Acc) -> + unmask_int(0, Mask, Rest, + <<Acc/binary, (M bxor N):32>>); +unmask_int(0, <<M:8, _/binary>> = Mask, + <<N:8, Rest/binary>>, Acc) -> + unmask_int(1, Mask, Rest, + <<Acc/binary, (M bxor N):8>>); +unmask_int(1, <<_:8, M:8, _/binary>> = Mask, + <<N:8, Rest/binary>>, Acc) -> + unmask_int(2, Mask, Rest, + <<Acc/binary, (M bxor N):8>>); +unmask_int(2, <<_:16, M:8, _/binary>> = Mask, + <<N:8, Rest/binary>>, Acc) -> + unmask_int(3, Mask, Rest, + <<Acc/binary, (M bxor N):8>>); +unmask_int(3, <<_:24, M:8>> = Mask, + <<N:8, Rest/binary>>, Acc) -> + unmask_int(0, Mask, Rest, + <<Acc/binary, (M bxor N):8>>). + +unmask(#frame_info{mask = none} = State, Data) -> + {State, Data}; +unmask(#frame_info{mask = Mask, offset = Offset} = State, Data) -> + {Unmasked, NewOffset} = unmask_int(Offset, Mask, + Data, <<>>), + {State#frame_info{offset = NewOffset}, Unmasked}. + +process_frame(none, Data) -> + process_frame(#frame_info{}, Data); +process_frame(#frame_info{left = Left} = FrameInfo, <<>>) when Left > 0 -> + {FrameInfo, [], []}; +process_frame(#frame_info{unprocessed = none, + unmasked = UnmaskedPre, left = Left} = + State, + Data) + when byte_size(Data) < Left -> + {State2, Unmasked} = unmask(State, Data), + {State2#frame_info{left = Left - byte_size(Data), + unmasked = [UnmaskedPre, Unmasked]}, + [], []}; +process_frame(#frame_info{unprocessed = none, + unmasked = UnmaskedPre, opcode = Opcode, + final_frame = Final, left = Left, + unmasked_msg = UnmaskedMsg} = + FrameInfo, + Data) -> + <<ToProcess:(Left)/binary, Unprocessed/binary>> = Data, + {_, Unmasked} = unmask(FrameInfo, ToProcess), + case Final of + true -> + {FrameInfo3, Recv, Send} = process_frame(#frame_info{}, + Unprocessed), + case Opcode of + X when X < 3 -> + {FrameInfo3, + [iolist_to_binary([UnmaskedMsg, UnmaskedPre, Unmasked]) + | Recv], + Send}; + 9 -> % Ping + Frame = encode_frame(Unprocessed, 10), + {FrameInfo3#frame_info{unmasked_msg = UnmaskedMsg}, [ping | Recv], + [Frame | Send]}; + 10 -> % Pong + {FrameInfo3, [pong | Recv], Send}; + 8 -> % Close + CloseCode = case Unmasked of + <<Code:16/integer-big, Message/binary>> -> + ?DEBUG("WebSocket close op: ~p ~s", + [Code, Message]), + Code; + <<Code:16/integer-big>> -> + ?DEBUG("WebSocket close op: ~p", [Code]), + Code; + _ -> + ?DEBUG("WebSocket close op unknown: ~p", + [Unmasked]), + 1000 + end, + + Frame = encode_frame(<<CloseCode:16/integer-big>>, 8), + {FrameInfo3#frame_info{unmasked_msg=UnmaskedMsg}, Recv, + [Frame | Send]}; + _ -> + {FrameInfo3#frame_info{unmasked_msg = UnmaskedMsg}, Recv, + Send} + end; + _ -> + process_frame(#frame_info{unmasked_msg = + [UnmaskedMsg, UnmaskedPre, + Unmasked]}, + Unprocessed) + end; +process_frame(#frame_info{unprocessed = <<>>} = + FrameInfo, + Data) -> + case decode_header(Data) of + none -> + {FrameInfo#frame_info{unprocessed = Data}, [], []}; + {Len, Final, Opcode, Mask, Rest} -> + process_frame(FrameInfo#frame_info{mask = Mask, + final_frame = Final == 1, + left = Len, opcode = Opcode, + unprocessed = none}, + Rest) + end; +process_frame(#frame_info{unprocessed = + UnprocessedPre} = + FrameInfo, + Data) -> + process_frame(FrameInfo#frame_info{unprocessed = <<>>}, + <<UnprocessedPre/binary, Data/binary>>). + +handle_data(tcp, FrameInfo, Data, Socket, WsHandleLoopPid, p1_tls) -> + case p1_tls:recv_data(Socket, Data) of + {ok, NewData} -> + handle_data_int(FrameInfo, NewData, Socket, WsHandleLoopPid, p1_tls); + {error, Error} -> + {error, Error} + end; +handle_data(_, FrameInfo, Data, Socket, WsHandleLoopPid, SockMod) -> + handle_data_int(FrameInfo, Data, Socket, WsHandleLoopPid, SockMod). + +handle_data_int(FrameInfo, Data, _Socket, WsHandleLoopPid, _SocketMode) -> + {NewFrameInfo, Recv, Send} = process_frame(FrameInfo, Data), + lists:foreach(fun (El) -> + case El of + pong -> + WsHandleLoopPid ! pong; + ping -> + WsHandleLoopPid ! ping; + _ -> + WsHandleLoopPid ! {received, El} + end + end, + Recv), + {NewFrameInfo, Send}. + +websocket_close(Socket, WsHandleLoopPid, + SocketMode, CloseCode) when CloseCode > 0 -> + Frame = encode_frame(<<CloseCode:16/integer-big>>, 8), + SocketMode:send(Socket, Frame), + websocket_close(Socket, WsHandleLoopPid, SocketMode, 0); +websocket_close(Socket, WsHandleLoopPid, SocketMode, _CloseCode) -> + WsHandleLoopPid ! closed, + SocketMode:close(Socket). diff --git a/src/eldap.erl b/src/eldap.erl index c07ddc07b..5e084b01b 100644 --- a/src/eldap.erl +++ b/src/eldap.erl @@ -139,8 +139,8 @@ passwd = <<"">> :: binary(), id = 0 :: non_neg_integer(), bind_timer = make_ref() :: reference(), - dict = dict:new() :: dict(), - req_q = queue:new() :: queue()}). + dict = dict:new() :: ?TDICT, + req_q = queue:new() :: ?TQUEUE}). %%%---------------------------------------------------------------------- %%% API diff --git a/src/jlib.erl b/src/jlib.erl index 2c0f30b3f..76886a7dc 100644 --- a/src/jlib.erl +++ b/src/jlib.erl @@ -57,6 +57,7 @@ %% TODO: Remove once XEP-0091 is Obsolete %% TODO: Remove once XEP-0091 is Obsolete +-include("ejabberd.hrl"). -include("jlib.hrl"). -export_type([jid/0]). @@ -972,7 +973,7 @@ i2l(L, N) when is_binary(L) -> _ -> i2l(<<$0, L/binary>>, N) end. --spec queue_drop_while(fun((term()) -> boolean()), queue()) -> queue(). +-spec queue_drop_while(fun((term()) -> boolean()), ?TQUEUE) -> ?TQUEUE. queue_drop_while(F, Q) -> case queue:peek(Q) of diff --git a/src/mod_caps.erl b/src/mod_caps.erl index 5c6d041f8..a96379e6d 100644 --- a/src/mod_caps.erl +++ b/src/mod_caps.erl @@ -17,10 +17,9 @@ %%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU %%% General Public License for more details. %%% -%%% You should have received a copy of the GNU General Public License -%%% along with this program; if not, write to the Free Software -%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA -%%% 02111-1307 USA +%%% You should have received a copy of the GNU General Public License along +%%% with this program; if not, write to the Free Software Foundation, Inc., +%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. %%% %%% 2009, improvements from ProcessOne to support correct PEP handling %%% through s2s, use less memory, and speedup global caps handling diff --git a/src/mod_irc_connection.erl b/src/mod_irc_connection.erl index a99e64cdd..cc21b0f14 100644 --- a/src/mod_irc_connection.erl +++ b/src/mod_irc_connection.erl @@ -51,12 +51,12 @@ encoding = <<"">> :: binary(), port = 0 :: inet:port_number(), password = <<"">> :: binary(), - queue = queue:new() :: queue(), + queue = queue:new() :: ?TQUEUE, user = #jid{} :: jid(), host = <<"">> :: binary(), server = <<"">> :: binary(), nick = <<"">> :: binary(), - channels = dict:new() :: dict(), + channels = dict:new() :: ?TDICT, nickchannel :: binary(), mod = mod_irc :: atom(), inbuf = <<"">> :: binary(), diff --git a/src/mod_muc_room.erl b/src/mod_muc_room.erl index 58ac2610b..aae90af4b 100644 --- a/src/mod_muc_room.erl +++ b/src/mod_muc_room.erl @@ -752,6 +752,9 @@ handle_sync_event({change_config, Config}, _From, handle_sync_event({change_state, NewStateData}, _From, StateName, _StateData) -> {reply, {ok, NewStateData}, StateName, NewStateData}; +handle_sync_event({process_item_change, Item, UJID}, _From, StateName, StateData) -> + NSD = process_item_change(Item, StateData, UJID), + {reply, {ok, NSD}, StateName, NSD}; handle_sync_event(_Event, _From, StateName, StateData) -> Reply = ok, {reply, Reply, StateName, StateData}. @@ -2612,114 +2615,7 @@ process_admin_items_set(UJID, Items, Lang, StateData) -> "room ~s:~n ~p", [jlib:jid_to_string(UJID), jlib:jid_to_string(StateData#state.jid), Res]), - NSD = lists:foldl(fun (E, SD) -> - case catch case E of - {JID, affiliation, owner, _} - when JID#jid.luser == - <<"">> -> - %% If the provided JID does not have username, - %% forget the affiliation completely - SD; - {JID, role, none, Reason} -> - catch - send_kickban_presence(UJID, JID, - Reason, - <<"307">>, - SD), - set_role(JID, none, SD); - {JID, affiliation, none, - Reason} -> - case - (SD#state.config)#config.members_only - of - true -> - catch - send_kickban_presence(UJID, JID, - Reason, - <<"321">>, - none, - SD), - SD1 = - set_affiliation(JID, - none, - SD), - set_role(JID, none, - SD1); - _ -> - SD1 = - set_affiliation(JID, - none, - SD), - send_update_presence(JID, - SD1), - SD1 - end; - {JID, affiliation, outcast, - Reason} -> - catch - send_kickban_presence(UJID, JID, - Reason, - <<"301">>, - outcast, - SD), - set_affiliation(JID, - outcast, - set_role(JID, - none, - SD), - Reason); - {JID, affiliation, A, Reason} - when (A == admin) or - (A == owner) -> - SD1 = set_affiliation(JID, - A, - SD, - Reason), - SD2 = set_role(JID, - moderator, - SD1), - send_update_presence(JID, - Reason, - SD2), - SD2; - {JID, affiliation, member, - Reason} -> - SD1 = set_affiliation(JID, - member, - SD, - Reason), - SD2 = set_role(JID, - participant, - SD1), - send_update_presence(JID, - Reason, - SD2), - SD2; - {JID, role, Role, Reason} -> - SD1 = set_role(JID, Role, - SD), - catch - send_new_presence(JID, - Reason, - SD1), - SD1; - {JID, affiliation, A, - _Reason} -> - SD1 = set_affiliation(JID, - A, - SD), - send_update_presence(JID, - SD1), - SD1 - end - of - {'EXIT', ErrReason} -> - ?ERROR_MSG("MUC ITEMS SET ERR: ~p~n", - [ErrReason]), - SD; - NSD -> NSD - end - end, + NSD = lists:foldl(process_item_change(UJID), StateData, lists:flatten(Res)), case (NSD#state.config)#config.persistent of true -> @@ -2732,6 +2628,79 @@ process_admin_items_set(UJID, Items, Lang, StateData) -> Err -> Err end. +process_item_change(UJID) -> + fun(E, SD) -> + process_item_change(E, SD, UJID) + end. + +process_item_change(E, SD, UJID) -> + case catch case E of + {JID, affiliation, owner, _} when JID#jid.luser == <<"">> -> + %% If the provided JID does not have username, + %% forget the affiliation completely + SD; + {JID, role, none, Reason} -> + catch + send_kickban_presence(UJID, JID, + Reason, + <<"307">>, + SD), + set_role(JID, none, SD); + {JID, affiliation, none, Reason} -> + case (SD#state.config)#config.members_only of + true -> + catch + send_kickban_presence(UJID, JID, + Reason, + <<"321">>, + none, + SD), + SD1 = set_affiliation(JID, none, SD), + set_role(JID, none, SD1); + _ -> + SD1 = set_affiliation(JID, none, SD), + send_update_presence(JID, SD1), + SD1 + end; + {JID, affiliation, outcast, Reason} -> + catch + send_kickban_presence(UJID, JID, + Reason, + <<"301">>, + outcast, + SD), + set_affiliation(JID, + outcast, + set_role(JID, none, SD), + Reason); + {JID, affiliation, A, Reason} + when (A == admin) or (A == owner) -> + SD1 = set_affiliation(JID, A, SD, Reason), + SD2 = set_role(JID, moderator, SD1), + send_update_presence(JID, Reason, SD2), + SD2; + {JID, affiliation, member, Reason} -> + SD1 = set_affiliation(JID, member, SD, Reason), + SD2 = set_role(JID, participant, SD1), + send_update_presence(JID, Reason, SD2), + SD2; + {JID, role, Role, Reason} -> + SD1 = set_role(JID, Role, SD), + catch + send_new_presence(JID, Reason, SD1), + SD1; + {JID, affiliation, A, _Reason} -> + SD1 = set_affiliation(JID, A, SD), + send_update_presence(JID, SD1), + SD1 + end + of + {'EXIT', ErrReason} -> + ?ERROR_MSG("MUC ITEMS SET ERR: ~p~n", [ErrReason]), + SD; + NSD -> NSD + end. + find_changed_items(_UJID, _UAffiliation, _URole, [], _Lang, _StateData, Res) -> {result, Res}; diff --git a/src/mod_ping.erl b/src/mod_ping.erl index 87cf6e015..f493dccb8 100644 --- a/src/mod_ping.erl +++ b/src/mod_ping.erl @@ -63,7 +63,7 @@ send_pings = ?DEFAULT_SEND_PINGS :: boolean(), ping_interval = ?DEFAULT_PING_INTERVAL :: non_neg_integer(), timeout_action = none :: none | kill, - timers = (?DICT):new() :: dict()}). + timers = (?DICT):new() :: ?TDICT}). %%==================================================================== %% API diff --git a/src/mod_pubsub.erl b/src/mod_pubsub.erl index 3f4a4d7ec..08e351462 100644 --- a/src/mod_pubsub.erl +++ b/src/mod_pubsub.erl @@ -87,7 +87,7 @@ unsubscribe_node/5, publish_item/6, delete_item/4, - send_items/6, + send_items/7, get_items/2, get_item/3, get_cached_item/2, @@ -864,6 +864,7 @@ send_loop(State) -> N, NodeId, Type, + Options, LJID, last); _ -> ok @@ -960,6 +961,7 @@ send_loop(State) -> Node, NodeId, Type, + Options, LJID, last); true -> @@ -2926,7 +2928,8 @@ subscribe_node(Host, Node, From, JID, Configuration) -> {TNode, {Result, subscribed, SubId, send_last}}} -> NodeId = TNode#pubsub_node.id, Type = TNode#pubsub_node.type, - send_items(Host, Node, NodeId, Type, Subscriber, last), + Options = TNode#pubsub_node.options, + send_items(Host, Node, NodeId, Type, Options, Subscriber, last), case Result of default -> {result, Reply({subscribed, SubId})}; _ -> {result, Result} @@ -3388,14 +3391,15 @@ get_allowed_items_call(Host, NodeIdx, From, Type, Options, Owners) -> %% Node = pubsubNode() %% NodeId = pubsubNodeId() %% Type = pubsubNodeType() +%% Options = mod_pubsub:nodeOptions() %% LJID = {U, S, []} %% Number = last | integer() %% @doc <p>Resend the items of a node to the user.</p> %% @todo use cache-last-item feature -send_items(Host, Node, NodeId, Type, LJID, last) -> +send_items(Host, Node, NodeId, Type, Options, LJID, last) -> case get_cached_item(Host, NodeId) of undefined -> - send_items(Host, Node, NodeId, Type, LJID, 1); + send_items(Host, Node, NodeId, Type, Options, LJID, 1); LastItem -> {ModifNow, ModifUSR} = LastItem#pubsub_item.modification, @@ -3405,9 +3409,9 @@ send_items(Host, Node, NodeId, Type, LJID, last) -> children = itemsEls([LastItem])}], ModifNow, ModifUSR), - dispatch_items(Host, LJID, Node, Stanza) + dispatch_items(Host, LJID, Node, Options, Stanza) end; -send_items(Host, Node, NodeId, Type, LJID, Number) -> +send_items(Host, Node, NodeId, Type, Options, LJID, Number) -> ToSend = case node_action(Host, Type, get_items, [NodeId, LJID]) of @@ -3435,20 +3439,23 @@ send_items(Host, Node, NodeId, Type, LJID, Number) -> attrs = nodeAttr(Node), children = itemsEls(ToSend)}]) end, - dispatch_items(Host, LJID, Node, Stanza). + dispatch_items(Host, LJID, Node, Options, Stanza). --spec(dispatch_items/4 :: +-spec(dispatch_items/5 :: ( - From :: mod_pubsub:host(), - To :: jid(), - Node :: mod_pubsub:nodeId(), - Stanza :: xmlel() | undefined) + From :: mod_pubsub:host(), + To :: jid(), + Node :: mod_pubsub:nodeId(), + Options :: mod_pubsub:nodeOptions(), + Stanza :: xmlel() | undefined) -> any() ). -dispatch_items(_From, _To, _Node, _Stanza = undefined) -> ok; +dispatch_items(_From, _To, _Node, _Options, _Stanza = undefined) -> ok; dispatch_items({FromU, FromS, FromR} = From, {ToU, ToS, ToR} = To, Node, - Stanza) -> + Options, BaseStanza) -> + NotificationType = get_option(Options, notification_type, headline), + Stanza = add_message_type(BaseStanza, NotificationType), C2SPid = case ejabberd_sm:get_session_pid(ToU, ToS, ToR) of ToPid when is_pid(ToPid) -> ToPid; _ -> @@ -3465,7 +3472,9 @@ dispatch_items({FromU, FromS, FromR} = From, {ToU, ToS, ToR} = To, Node, service_jid(From), jlib:make_jid(To), Stanza) end; -dispatch_items(From, To, _Node, Stanza) -> +dispatch_items(From, To, _Node, Options, BaseStanza) -> + NotificationType = get_option(Options, notification_type, headline), + Stanza = add_message_type(BaseStanza, NotificationType), ejabberd_router:route(service_jid(From), jlib:make_jid(To), Stanza). %% @spec (Host, JID, Plugins) -> {error, Reason} | {result, Response} @@ -4485,10 +4494,7 @@ broadcast_stanza(Host, _Node, _NodeId, _Type, NodeOptions, SubsByDepth, NotifyTy NotificationType = get_option(NodeOptions, notification_type, headline), BroadcastAll = get_option(NodeOptions, broadcast_all_resources), %% XXX this is not standard, but usefull From = service_jid(Host), - Stanza = case NotificationType of - normal -> BaseStanza; - MsgType -> add_message_type(BaseStanza, iolist_to_binary(atom_to_list(MsgType))) - end, + Stanza = add_message_type(BaseStanza, NotificationType), %% Handles explicit subscriptions SubIDsByJID = subscribed_nodes_by_jid(NotifyType, SubsByDepth), lists:foreach(fun ({LJID, NodeName, SubIDs}) -> @@ -4520,10 +4526,8 @@ broadcast_stanza({LUser, LServer, LResource}, Publisher, Node, NodeId, Type, Nod SenderResource = user_resource(LUser, LServer, LResource), case ejabberd_sm:get_session_pid(LUser, LServer, SenderResource) of C2SPid when is_pid(C2SPid) -> - Stanza = case get_option(NodeOptions, notification_type, headline) of - normal -> BaseStanza; - MsgType -> add_message_type(BaseStanza, iolist_to_binary(atom_to_list(MsgType))) - end, + NotificationType = get_option(NodeOptions, notification_type, headline), + Stanza = add_message_type(BaseStanza, NotificationType), %% set the from address on the notification to the bare JID of the account owner %% Also, add "replyto" if entity has presence subscription to the account owner %% See XEP-0163 1.1 section 4.3.1 @@ -5301,10 +5305,19 @@ itemsEls(Items) -> #xmlel{name = <<"item">>, attrs = itemAttr(ItemId), children = Payload} end, Items). +-spec(add_message_type/2 :: +( + Message :: xmlel(), + Type :: atom()) + -> xmlel() +). + +add_message_type(Message, normal) -> Message; add_message_type(#xmlel{name = <<"message">>, attrs = Attrs, children = Els}, Type) -> #xmlel{name = <<"message">>, - attrs = [{<<"type">>, Type} | Attrs], children = Els}; + attrs = [{<<"type">>, jlib:atom_to_binary(Type)} | Attrs], + children = Els}; add_message_type(XmlEl, _Type) -> XmlEl. %% Place of <headers/> changed at the bottom of the stanza diff --git a/src/mod_pubsub_odbc.erl b/src/mod_pubsub_odbc.erl index 3b8ae682a..4b9787821 100644 --- a/src/mod_pubsub_odbc.erl +++ b/src/mod_pubsub_odbc.erl @@ -87,7 +87,7 @@ unsubscribe_node/5, publish_item/6, delete_item/4, - send_items/6, + send_items/7, get_items/2, get_item/3, get_cached_item/2, @@ -464,12 +464,16 @@ send_loop(State) -> type = Type, id = - NodeId} = + NodeId, + options + = + Options} = Node, send_items(H, N, NodeId, Type, + Options, LJID, last); true -> @@ -564,6 +568,7 @@ send_loop(State) -> Node, NodeId, Type, + Options, LJID, last); true -> @@ -2550,7 +2555,8 @@ subscribe_node(Host, Node, From, JID, Configuration) -> {TNode, {Result, subscribed, SubId, send_last}}} -> NodeId = TNode#pubsub_node.id, Type = TNode#pubsub_node.type, - send_items(Host, Node, NodeId, Type, Subscriber, last), + Options = TNode#pubsub_node.options, + send_items(Host, Node, NodeId, Type, Options, Subscriber, last), case Result of default -> {result, Reply({subscribed, SubId})}; _ -> {result, Result} @@ -3018,11 +3024,12 @@ get_allowed_items_call(Host, NodeIdx, From, Type, Options, Owners, RSM) -> %% Node = pubsubNode() %% NodeId = pubsubNodeId() %% Type = pubsubNodeType() +%% Options = mod_pubsubnodeOptions() %% LJID = {U, S, []} %% Number = last | integer() %% @doc <p>Resend the items of a node to the user.</p> %% @todo use cache-last-item feature -send_items(Host, Node, NodeId, Type, LJID, last) -> +send_items(Host, Node, NodeId, Type, Options, LJID, last) -> Stanza = case get_cached_item(Host, NodeId) of undefined -> % special ODBC optimization, works only with node_hometree_odbc, node_flat_odbc and node_pep_odbc @@ -3047,8 +3054,8 @@ send_items(Host, Node, NodeId, Type, LJID, last) -> itemsEls([LastItem])}], ModifNow, ModifUSR) end, - dispatch_items(Host, LJID, Node, Stanza); -send_items(Host, Node, NodeId, Type, LJID, Number) -> + dispatch_items(Host, LJID, Node, Options, Stanza); +send_items(Host, Node, NodeId, Type, Options, LJID, Number) -> ToSend = case node_action(Host, Type, get_items, [NodeId, LJID]) of @@ -3076,20 +3083,23 @@ send_items(Host, Node, NodeId, Type, LJID, Number) -> attrs = nodeAttr(Node), children = itemsEls(ToSend)}]) end, - dispatch_items(Host, LJID, Node, Stanza). + dispatch_items(Host, LJID, Node, Options, Stanza). --spec(dispatch_items/4 :: +-spec(dispatch_items/5 :: ( - From :: mod_pubsub:host(), - To :: jid(), - Node :: mod_pubsub:nodeId(), - Stanza :: xmlel() | undefined) + From :: mod_pubsub:host(), + To :: jid(), + Node :: mod_pubsub:nodeId(), + Options :: mod_pubsub:nodeOptions(), + Stanza :: xmlel() | undefined) -> any() ). -dispatch_items(_From, _To, _Node, _Stanza = undefined) -> ok; +dispatch_items(_From, _To, _Node, _Options, _Stanza = undefined) -> ok; dispatch_items({FromU, FromS, FromR} = From, {ToU, ToS, ToR} = To, Node, - Stanza) -> + Options, BaseStanza) -> + NotificationType = get_option(Options, notification_type, headline), + Stanza = add_message_type(BaseStanza, NotificationType), C2SPid = case ejabberd_sm:get_session_pid(ToU, ToS, ToR) of ToPid when is_pid(ToPid) -> ToPid; _ -> @@ -3106,7 +3116,9 @@ dispatch_items({FromU, FromS, FromR} = From, {ToU, ToS, ToR} = To, Node, service_jid(From), jlib:make_jid(To), Stanza) end; -dispatch_items(From, To, _Node, Stanza) -> +dispatch_items(From, To, _Node, Options, BaseStanza) -> + NotificationType = get_option(Options, notification_type, headline), + Stanza = add_message_type(BaseStanza, NotificationType), ejabberd_router:route(service_jid(From), jlib:make_jid(To), Stanza). %% @spec (Host, JID, Plugins) -> {error, Reason} | {result, Response} @@ -4091,10 +4103,7 @@ broadcast_stanza(Host, _Node, _NodeId, _Type, NodeOptions, SubsByDepth, NotifyTy NotificationType = get_option(NodeOptions, notification_type, headline), BroadcastAll = get_option(NodeOptions, broadcast_all_resources), %% XXX this is not standard, but usefull From = service_jid(Host), - Stanza = case NotificationType of - normal -> BaseStanza; - MsgType -> add_message_type(BaseStanza, iolist_to_binary(atom_to_list(MsgType))) - end, + Stanza = add_message_type(BaseStanza, NotificationType), %% Handles explicit subscriptions SubIDsByJID = subscribed_nodes_by_jid(NotifyType, SubsByDepth), lists:foreach(fun ({LJID, NodeName, SubIDs}) -> @@ -4126,10 +4135,8 @@ broadcast_stanza({LUser, LServer, LResource}, Publisher, Node, NodeId, Type, Nod SenderResource = user_resource(LUser, LServer, LResource), case ejabberd_sm:get_session_pid(LUser, LServer, SenderResource) of C2SPid when is_pid(C2SPid) -> - Stanza = case get_option(NodeOptions, notification_type, headline) of - normal -> BaseStanza; - MsgType -> add_message_type(BaseStanza, iolist_to_binary(atom_to_list(MsgType))) - end, + NotificationType = get_option(NodeOptions, notification_type, headline), + Stanza = add_message_type(BaseStanza, NotificationType), %% set the from address on the notification to the bare JID of the account owner %% Also, add "replyto" if entity has presence subscription to the account owner %% See XEP-0163 1.1 section 4.3.1 @@ -4966,10 +4973,19 @@ itemsEls(Items) -> #xmlel{name = <<"item">>, attrs = itemAttr(ItemId), children = Payload} end, Items). +-spec(add_message_type/2 :: +( + Message :: xmlel(), + Type :: atom()) + -> xmlel() +). + +add_message_type(Message, normal) -> Message; add_message_type(#xmlel{name = <<"message">>, attrs = Attrs, children = Els}, Type) -> #xmlel{name = <<"message">>, - attrs = [{<<"type">>, Type} | Attrs], children = Els}; + attrs = [{<<"type">>, jlib:atom_to_binary(Type)} | Attrs], + children = Els}; add_message_type(XmlEl, _Type) -> XmlEl. %% Place of <headers/> changed at the bottom of the stanza |