diff options
author | Badlop <badlop@process-one.net> | 2009-08-31 18:37:52 +0000 |
---|---|---|
committer | Badlop <badlop@process-one.net> | 2009-08-31 18:37:52 +0000 |
commit | a033b0615072010929379a64e7e285d1a27506ba (patch) | |
tree | ed33dd0bd06b6bd74d3faa5ed9e2f299fcb2bf0a /src/web/ejabberd_http_bind.erl | |
parent | Include in release notes Zlib+STARTTLS support (diff) |
BOSH module optimization and clean-up (thanks to Aleksey Shchepin and Mickaël Rémond)(EJAB-936)
SVN Revision: 2574
Diffstat (limited to 'src/web/ejabberd_http_bind.erl')
-rw-r--r-- | src/web/ejabberd_http_bind.erl | 931 |
1 files changed, 504 insertions, 427 deletions
diff --git a/src/web/ejabberd_http_bind.erl b/src/web/ejabberd_http_bind.erl index 68bb8e38..336d4171 100644 --- a/src/web/ejabberd_http_bind.erl +++ b/src/web/ejabberd_http_bind.erl @@ -4,29 +4,11 @@ %%% Purpose : Implements XMPP over BOSH (XEP-0205) (formerly known as %%% HTTP Binding) %%% Created : 21 Sep 2005 by Stefan Strigler <steve@zeank.in-berlin.de> -%%% -%%% -%%% ejabberd, Copyright (C) 2002-2009 ProcessOne -%%% -%%% This program is free software; you can redistribute it and/or -%%% modify it under the terms of the GNU General Public License as -%%% published by the Free Software Foundation; either version 2 of the -%%% License, or (at your option) any later version. -%%% -%%% This program is distributed in the hope that it will be useful, -%%% but WITHOUT ANY WARRANTY; without even the implied warranty of -%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -%%% General Public License for more details. -%%% -%%% You should have received a copy of the GNU General Public License -%%% along with this program; if not, write to the Free Software -%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA -%%% 02111-1307 USA -%%% +%%% Modified: may 2009 by Mickael Remond, Alexey Schepin +%%% Id : $Id: ejabberd_http_bind.erl 953 2009-05-07 10:40:40Z alexey $ %%%---------------------------------------------------------------------- -module(ejabberd_http_bind). --author('steve@zeank.in-berlin.de'). -behaviour(gen_fsm). @@ -39,15 +21,19 @@ handle_info/3, terminate/3, send/2, - setopts/2, + send_xml/2, sockname/1, peername/1, + setopts/2, controlling_process/2, + become_controller/2, + custom_receiver/1, + reset_stream/1, + change_shaper/2, + monitor/1, close/1, process_request/2]). --define(ejabberd_debug, true). - -include("ejabberd.hrl"). -include("jlib.hrl"). -include("ejabberd_http.hrl"). @@ -59,7 +45,6 @@ %% http binding request -record(hbr, {rid, key, - in, out}). -record(state, {id, @@ -67,8 +52,10 @@ key, socket, output = "", - input = "", + input = queue:new(), waiting_input = false, + shaper_state, + shaper_timer, last_receiver, last_poll, http_receiver, @@ -76,22 +63,30 @@ ctime = 0, timer, pause=0, - unprocessed_req_list = [], % list of request that have been delayed for proper reordering + unprocessed_req_list = [], % list of request that have been delayed for proper reordering: {Request, PID} req_list = [], % list of requests (cache) max_inactivity, + max_pause, ip = ?NULL_PEER }). +%% Internal request format: +-record(http_put, {rid, + attrs, + payload, + payload_size, + hold, + stream, + ip}). %%-define(DBGFSM, true). - -ifdef(DBGFSM). -define(FSMOPTS, [{debug, [trace]}]). -else. -define(FSMOPTS, []). -endif. --define(BOSH_VERSION, "1.6"). +-define(BOSH_VERSION, "1.8"). -define(NS_CLIENT, "jabber:client"). -define(NS_BOSH, "urn:xmpp:xbosh"). -define(NS_HTTP_BIND, "http://jabber.org/protocol/httpbind"). @@ -128,6 +123,9 @@ start_link(Sid, Key, IP) -> send({http_bind, FsmRef, _IP}, Packet) -> gen_fsm:sync_send_all_state_event(FsmRef, {send, Packet}). +send_xml({http_bind, FsmRef, _IP}, Packet) -> + gen_fsm:sync_send_all_state_event(FsmRef, {send_xml, Packet}). + setopts({http_bind, FsmRef, _IP}, Opts) -> case lists:member({active, once}, Opts) of true -> @@ -139,6 +137,21 @@ setopts({http_bind, FsmRef, _IP}, Opts) -> controlling_process(_Socket, _Pid) -> ok. +custom_receiver({http_bind, FsmRef, _IP}) -> + {receiver, ?MODULE, FsmRef}. + +become_controller(FsmRef, C2SPid) -> + gen_fsm:send_all_state_event(FsmRef, {become_controller, C2SPid}). + +reset_stream({http_bind, _FsmRef, _IP}) -> + ok. + +change_shaper({http_bind, FsmRef, _IP}, Shaper) -> + gen_fsm:send_all_state_event(FsmRef, {change_shaper, Shaper}). + +monitor({http_bind, FsmRef, _IP}) -> + erlang:monitor(process, FsmRef). + close({http_bind, FsmRef, _IP}) -> catch gen_fsm:sync_send_all_state_event(FsmRef, {stop, close}). @@ -148,8 +161,19 @@ sockname(_Socket) -> peername({http_bind, _FsmRef, IP}) -> {ok, IP}. + +%% Entry point for data coming from client through ejabberd HTTP server: process_request(Data, IP) -> - case catch parse_request(Data) of + Opts1 = ejabberd_c2s_config:get_c2s_limits(), + Opts = [{xml_socket, true} | Opts1], + MaxStanzaSize = + case lists:keysearch(max_stanza_size, 1, Opts) of + {value, {_, Size}} -> Size; + _ -> infinity + end, + PayloadSize = iolist_size(Data), + case catch parse_request(Data, PayloadSize, MaxStanzaSize) of + %% No existing session: {ok, {"", Rid, Attrs, Payload}} -> case xml:get_attr_s("to",Attrs) of "" -> @@ -166,11 +190,13 @@ process_request(Data, IP) -> "condition='internal-server-error' " "xmlns='" ++ ?NS_HTTP_BIND ++ "'>BOSH module not started</body"}; {ok, Pid} -> - handle_session_start(Pid, XmppDomain, Sid, Rid, Attrs, Payload, IP) + handle_session_start( + Pid, XmppDomain, Sid, Rid, Attrs, + Payload, PayloadSize, IP) end end; + %% Existing session {ok, {Sid, Rid, Attrs, Payload1}} -> - %% old session StreamStart = case xml:get_attr_s("xmpp:restart",Attrs) of "true" -> @@ -181,17 +207,21 @@ process_request(Data, IP) -> Payload2 = case xml:get_attr_s("type",Attrs) of "terminate" -> %% close stream - Payload1 ++ "</stream:stream>"; + Payload1 ++ [{xmlstreamend, "stream:stream"}]; _ -> Payload1 end, - handle_http_put(Sid, Rid, Attrs, Payload2, StreamStart, IP); + handle_http_put(Sid, Rid, Attrs, Payload2, PayloadSize, + StreamStart, IP); + {error, size_limit} -> + {413, ?HEADER, "Request Too Large"}; _ -> - ?ERROR_MSG("Received bad request: ~p", [Data]), + ?DEBUG("Received bad request: ~p", [Data]), {400, ?HEADER, ""} end. -handle_session_start(Pid, XmppDomain, Sid, Rid, Attrs, Payload, IP) -> +handle_session_start(Pid, XmppDomain, Sid, Rid, Attrs, + Payload, PayloadSize, IP) -> ?DEBUG("got pid: ~p", [Pid]), Wait = case string:to_integer(xml:get_attr_s("wait",Attrs)) of {error, _} -> @@ -235,7 +265,7 @@ handle_session_start(Pid, XmppDomain, Sid, Rid, Attrs, Payload, IP) -> version = Version }) end), - handle_http_put(Sid, Rid, Attrs, Payload, true, IP). + handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize, true, IP). %%%---------------------------------------------------------------------- %%% Callback functions from gen_fsm @@ -257,58 +287,46 @@ init([Sid, Key, IP]) -> %% each connector. The default behaviour should be however to use %% the default c2s restrictions if not defined for the current %% connector. - Opts = ejabberd_c2s_config:get_c2s_limits(), + Opts1 = ejabberd_c2s_config:get_c2s_limits(), + Opts = [{xml_socket, true} | Opts1], + Shaper = none, + ShaperState = shaper:new(Shaper), Socket = {http_bind, self(), IP}, ejabberd_socket:start(ejabberd_c2s, ?MODULE, Socket, Opts), Timer = erlang:start_timer(?MAX_INACTIVITY, self(), []), {ok, loop, #state{id = Sid, key = Key, socket = Socket, + shaper_state = ShaperState, max_inactivity = ?MAX_INACTIVITY, + max_pause = ?MAX_PAUSE, timer = Timer}}. %%---------------------------------------------------------------------- -%% Func: StateName/2 -%% Returns: {next_state, NextStateName, NextStateData} | -%% {next_state, NextStateName, NextStateData, Timeout} | -%% {stop, Reason, NewStateData} -%%---------------------------------------------------------------------- - - -%%---------------------------------------------------------------------- -%% Func: StateName/3 -%% Returns: {next_state, NextStateName, NextStateData} | -%% {next_state, NextStateName, NextStateData, Timeout} | -%% {reply, Reply, NextStateName, NextStateData} | -%% {reply, Reply, NextStateName, NextStateData, Timeout} | -%% {stop, Reason, NewStateData} | -%% {stop, Reason, Reply, NewStateData} -%%---------------------------------------------------------------------- -%state_name(Event, From, StateData) -> -% Reply = ok, -% {reply, Reply, state_name, StateData}. - -%%---------------------------------------------------------------------- %% Func: handle_event/3 %% Returns: {next_state, NextStateName, NextStateData} | %% {next_state, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} %%---------------------------------------------------------------------- -handle_event({activate, From}, StateName, StateData) -> +handle_event({become_controller, C2SPid}, StateName, StateData) -> case StateData#state.input of - "" -> + cancel -> {next_state, StateName, StateData#state{ - waiting_input = {From, ok}}}; + waiting_input = C2SPid}}; Input -> - Receiver = From, - Receiver ! {tcp, StateData#state.socket, list_to_binary(Input)}, + lists:foreach( + fun(Event) -> + C2SPid ! Event + end, queue:to_list(Input)), {next_state, StateName, StateData#state{ - input = "", - waiting_input = false, - last_receiver = Receiver}} + input = queue:new(), + waiting_input = C2SPid}} end; +handle_event({change_shaper, Shaper}, StateName, StateData) -> + NewShaperState = shaper:new(Shaper), + {next_state, StateName, StateData#state{shaper_state = NewShaperState}}; handle_event(_Event, StateName, StateData) -> {next_state, StateName, StateData}. @@ -321,37 +339,34 @@ handle_event(_Event, StateName, StateData) -> %% {stop, Reason, NewStateData} | %% {stop, Reason, Reply, NewStateData} %%---------------------------------------------------------------------- -handle_sync_event({send, Packet}, _From, StateName, StateData) -> - Output = [StateData#state.output | Packet], - if - StateData#state.http_receiver /= undefined -> - cancel_timer(StateData#state.timer), - Timer = if - StateData#state.pause > 0 -> - erlang:start_timer( - StateData#state.pause*1000, self(), []); - true -> - erlang:start_timer( - StateData#state.max_inactivity, self(), []) - end, - HTTPReply = case Output of - [[]| OutPacket] -> - {ok, OutPacket}; - _ -> - {ok, Output} - end, - gen_fsm:reply(StateData#state.http_receiver, HTTPReply), - cancel_timer(StateData#state.wait_timer), - Reply = ok, - {reply, Reply, StateName, - StateData#state{output = [], - http_receiver = undefined, - wait_timer = undefined, - timer = Timer}}; - true -> - Reply = ok, - {reply, Reply, StateName, StateData#state{output = Output}} - end; +handle_sync_event({send_xml, Packet}, _From, StateName, + #state{http_receiver = undefined} = StateData) -> + Output = [Packet | StateData#state.output], + Reply = ok, + {reply, Reply, StateName, StateData#state{output = Output}}; +handle_sync_event({send_xml, Packet}, _From, StateName, StateData) -> + Output = [Packet | StateData#state.output], + cancel_timer(StateData#state.timer), + Timer = set_inactivity_timer(StateData#state.pause, + StateData#state.max_inactivity), + HTTPReply = {ok, Output}, + gen_fsm:reply(StateData#state.http_receiver, HTTPReply), + cancel_timer(StateData#state.wait_timer), + Rid = StateData#state.rid, + ReqList = [#hbr{rid = Rid, + key = StateData#state.key, + out = Output + } | + [El || El <- StateData#state.req_list, + El#hbr.rid /= Rid ] + ], + Reply = ok, + {reply, Reply, StateName, + StateData#state{output = [], + http_receiver = undefined, + req_list = ReqList, + wait_timer = undefined, + timer = Timer}}; handle_sync_event({stop,close}, _From, _StateName, StateData) -> Reply = ok, @@ -360,130 +375,82 @@ handle_sync_event({stop,stream_closed}, _From, _StateName, StateData) -> Reply = ok, {stop, normal, Reply, StateData}; handle_sync_event({stop,Reason}, _From, _StateName, StateData) -> - ?DEBUG("Closing bind session ~p - Reason: ~p", [StateData#state.id, Reason]), + ?ERROR_MSG("Closing bind session ~p - Reason: ~p", [StateData#state.id, Reason]), Reply = ok, {stop, normal, Reply, StateData}; - %% HTTP PUT: Receive packets from the client -handle_sync_event({http_put, Rid, Attrs, _Payload, Hold, _StreamTo, _IP}=Request, - _From, StateName, StateData) -> - %% Check if Rid valid - RidAllow = - case StateData#state.rid of - none -> - %% First request - nothing saved so far - {true, 0}; - OldRid -> - ?DEBUG("state.rid/cur rid: ~p/~p", [OldRid, Rid]), - if - %% We did not miss any packet, we can process it immediately: - Rid == OldRid + 1 -> - case catch list_to_integer( - xml:get_attr_s("pause", Attrs)) of - {'EXIT', _} -> - {true, 0}; - Pause1 when Pause1 =< ?MAX_PAUSE -> - ?DEBUG("got pause: ~p", [Pause1]), - {true, Pause1}; - _ -> - {true, 0} - end; - %% We have missed packets, we need to cached it to process it later on: - (OldRid < Rid) and - (Rid =< (OldRid + Hold + 1)) -> - buffer; - (Rid =< OldRid) and - (Rid > OldRid - Hold - 1) -> - repeat; - true -> - false - end +handle_sync_event(#http_put{rid = Rid}, + _From, StateName, StateData) + when StateData#state.shaper_timer /= undefined -> + Pause = + case erlang:read_timer(StateData#state.shaper_timer) of + false -> + 0; + P -> P end, + Reply = {wait, Pause}, + ?DEBUG("Shaper timer for RID ~p: ~p", [Rid, Reply]), + {reply, Reply, StateName, StateData}; - %% Check if Rid is in sequence or out of sequence: - case RidAllow of - buffer -> - ?DEBUG("Buffered request: ~p", [Request]), - %% Request is out of sequence: - PendingRequests = StateData#state.unprocessed_req_list, - %% In case an existing RID was already buffered: - Requests = lists:keydelete(Rid, 2, PendingRequests), - {reply, ok, StateName, StateData#state{unprocessed_req_list=[Request|Requests]}}; - _ -> - %% Request is in sequence: - process_http_put(Request, StateName, StateData, RidAllow) - end; +handle_sync_event(#http_put{rid = _Rid, attrs = _Attrs, + payload_size = PayloadSize, + hold = _Hold} = Request, + _From, StateName, StateData) -> + ?DEBUG("New request: ~p",[Request]), + %% Updating trafic shaper + {NewShaperState, NewShaperTimer} = + update_shaper(StateData#state.shaper_state, PayloadSize), + + handle_http_put_event(Request, StateName, + StateData#state{shaper_state = NewShaperState, + shaper_timer = NewShaperTimer}); %% HTTP GET: send packets to the client handle_sync_event({http_get, Rid, Wait, Hold}, From, StateName, StateData) -> %% setup timer - if - StateData#state.http_receiver /= undefined -> - gen_fsm:reply(StateData#state.http_receiver, {ok, empty}); - true -> - ok - end, + send_receiver_reply(StateData#state.http_receiver, {ok, empty}), cancel_timer(StateData#state.wait_timer), - {TMegSec, TSec, TMSec} = now(), - TNow = (TMegSec * 1000000 + TSec) * 1000000 + TMSec, + TNow = tnow(), if (Hold > 0) and - (StateData#state.output == "") and + (StateData#state.output == []) and ((TNow - StateData#state.ctime) < (Wait*1000*1000)) and (StateData#state.rid == Rid) and - (StateData#state.input /= "cancel") and + (StateData#state.input /= cancel) and (StateData#state.pause == 0) -> WaitTimer = erlang:start_timer(Wait * 1000, self(), []), + %% MR: Not sure we should cancel the state timer here. cancel_timer(StateData#state.timer), {next_state, StateName, StateData#state{ http_receiver = From, wait_timer = WaitTimer, timer = undefined}}; - (StateData#state.input == "cancel") -> + (StateData#state.input == cancel) -> cancel_timer(StateData#state.timer), - Timer = if - StateData#state.pause > 0 -> - erlang:start_timer( - StateData#state.pause*1000, self(), []); - true -> - erlang:start_timer( - StateData#state.max_inactivity, self(), []) - end, + Timer = set_inactivity_timer(StateData#state.pause, + StateData#state.max_inactivity), Reply = {ok, cancel}, {reply, Reply, StateName, StateData#state{ - input = "", + input = queue:new(), http_receiver = undefined, wait_timer = undefined, timer = Timer}}; true -> cancel_timer(StateData#state.timer), - Timer = if - StateData#state.pause > 0 -> - erlang:start_timer( - StateData#state.pause*1000, self(), []); - true -> - erlang:start_timer( - StateData#state.max_inactivity, self(), []) - end, - case StateData#state.output of - [[]| OutPacket] -> - Reply = {ok, OutPacket}; - _ -> - Reply = {ok, StateData#state.output} - end, + Timer = set_inactivity_timer(StateData#state.pause, + StateData#state.max_inactivity), + Reply = {ok, StateData#state.output}, %% save request - ReqList = [#hbr{rid=Rid, - key=StateData#state.key, - in=StateData#state.input, - out=StateData#state.output + ReqList = [#hbr{rid = Rid, + key = StateData#state.key, + out = StateData#state.output } | [El || El <- StateData#state.req_list, El#hbr.rid /= Rid ] ], {reply, Reply, StateName, StateData#state{ - input = "", - output = "", + output = [], http_receiver = undefined, wait_timer = undefined, timer = Timer, @@ -507,6 +474,7 @@ code_change(_OldVsn, StateName, StateData, _Extra) -> %% {next_state, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} %%---------------------------------------------------------------------- +%% We reached the max_inactivity timeout: handle_info({timeout, Timer, _}, _StateName, #state{id=SID, timer = Timer} = StateData) -> ?WARNING_MSG("Session timeout. Closing the HTTP bind session: ~p", [SID]), @@ -517,23 +485,30 @@ handle_info({timeout, WaitTimer, _}, StateName, if StateData#state.http_receiver /= undefined -> cancel_timer(StateData#state.timer), - Timer = if - StateData#state.pause > 0 -> - erlang:start_timer( - StateData#state.pause*1000, self(), []); - true -> - erlang:start_timer( - StateData#state.max_inactivity, self(), []) - end, + Timer = set_inactivity_timer(StateData#state.pause, + StateData#state.max_inactivity), gen_fsm:reply(StateData#state.http_receiver, {ok, empty}), + Rid = StateData#state.rid, + ReqList = [#hbr{rid = Rid, + key = StateData#state.key, + out = [] + } | + [El || El <- StateData#state.req_list, + El#hbr.rid /= Rid ] + ], {next_state, StateName, StateData#state{http_receiver = undefined, + req_list = ReqList, wait_timer = undefined, timer = Timer}}; true -> {next_state, StateName, StateData} end; +handle_info({timeout, ShaperTimer, _}, StateName, + #state{shaper_timer = ShaperTimer} = StateData) -> + {next_state, StateName, StateData#state{shaper_timer = undefined}}; + handle_info(_, StateName, StateData) -> {next_state, StateName, StateData}. @@ -548,19 +523,12 @@ terminate(_Reason, _StateName, StateData) -> fun() -> mnesia:delete({http_bind, StateData#state.id}) end), - if - StateData#state.http_receiver /= undefined -> - gen_fsm:reply(StateData#state.http_receiver, {ok, terminate}); - true -> - ok - end, + send_receiver_reply(StateData#state.http_receiver, {ok, terminate}), case StateData#state.waiting_input of false -> - case StateData#state.last_receiver of - undefined -> ok; - Receiver -> Receiver ! {tcp_closed, StateData#state.socket} - end; - {Receiver, _Tag} -> Receiver ! {tcp_closed, StateData#state.socket} + ok; + C2SPid -> + gen_fsm:send_event(C2SPid, closed) end, ok. @@ -569,8 +537,47 @@ terminate(_Reason, _StateName, StateData) -> %%%---------------------------------------------------------------------- %% PUT / Get processing: -process_http_put({http_put, Rid, Attrs, Payload, Hold, StreamTo, IP}, +handle_http_put_event(#http_put{rid = Rid, attrs = Attrs, + hold = Hold} = Request, + StateName, StateData) -> + ?DEBUG("New request: ~p",[Request]), + %% Check if Rid valid + RidAllow = rid_allow(StateData#state.rid, Rid, Attrs, Hold, + StateData#state.max_pause), + + %% Check if Rid is in sequence or out of sequence: + case RidAllow of + buffer -> + ?DEBUG("Buffered request: ~p", [Request]), + %% Request is out of sequence: + PendingRequests = StateData#state.unprocessed_req_list, + %% In case an existing RID was already buffered: + Requests = lists:keydelete(Rid, 2, PendingRequests), + ReqList = [#hbr{rid = Rid, + key = StateData#state.key, + out = [] + } | + [El || El <- StateData#state.req_list, + El#hbr.rid > (Rid - 1 - Hold)] + ], + ?DEBUG("reqlist: ~p", [ReqList]), + UnprocessedReqList = [Request | Requests], + cancel_timer(StateData#state.timer), + Timer = set_inactivity_timer(0, StateData#state.max_inactivity), + {reply, buffered, StateName, + StateData#state{unprocessed_req_list = UnprocessedReqList, + req_list = ReqList, + timer = Timer}}; + _ -> + %% Request is in sequence: + process_http_put(Request, StateName, StateData, RidAllow) + end. + +process_http_put(#http_put{rid = Rid, attrs = Attrs, payload = Payload, + hold = Hold, stream = StreamTo, + ip = IP} = Request, StateName, StateData, RidAllow) -> + ?DEBUG("Actually processing request: ~p", [Request]), %% Check if key valid Key = xml:get_attr_s("key", Attrs), NewKey = xml:get_attr_s("newkey", Attrs), @@ -596,16 +603,15 @@ process_http_put({http_put, Rid, Attrs, Payload, Hold, StreamTo, IP}, end end end, - {TMegSec, TSec, TMSec} = now(), - TNow = (TMegSec * 1000000 + TSec) * 1000000 + TMSec, + TNow = tnow(), LastPoll = if - Payload == "" -> + Payload == [] -> TNow; true -> 0 end, if - (Payload == "") and + (Payload == []) and (Hold == 0) and (TNow - StateData#state.last_poll < ?MIN_POLLING) -> Reply = {error, polling_too_frequently}, @@ -618,17 +624,15 @@ process_http_put({http_put, Rid, Attrs, Payload, Hold, StreamTo, IP}, repeat -> ?DEBUG("REPEATING ~p", [Rid]), Reply = case [El#hbr.out || - El <- StateData#state.req_list, + El <- StateData#state.req_list, El#hbr.rid == Rid] of [] -> {error, not_exists}; - [ [[] | Out] | _XS] -> - {repeat, Out}; [Out | _XS] -> - {repeat, Out} - end, - {reply, Reply, StateName, - StateData#state{input = "cancel", last_poll = LastPoll}}; + {repeat, lists:reverse(Out)} + end, + {reply, Reply, StateName, StateData#state{input = cancel, + last_poll = LastPoll}}; {true, Pause} -> SaveKey = if NewKey == "" -> @@ -639,30 +643,33 @@ process_http_put({http_put, Rid, Attrs, Payload, Hold, StreamTo, IP}, ?DEBUG(" -- SaveKey: ~s~n", [SaveKey]), %% save request - ReqList = [#hbr{rid=Rid, - key=StateData#state.key, - in=StateData#state.input, - out=StateData#state.output - } | - [El || El <- StateData#state.req_list, - El#hbr.rid < Rid, - El#hbr.rid > (Rid - 1 - Hold)] - ], + ReqList1 = + [El || El <- StateData#state.req_list, + El#hbr.rid > (Rid - 1 - Hold)], + ReqList = + case lists:keymember(Rid, #hbr.rid, ReqList1) of + true -> + ReqList1; + false -> + [#hbr{rid = Rid, + key = StateData#state.key, + out = [] + } | + ReqList1 + ] + end, ?DEBUG("reqlist: ~p", [ReqList]), %% setup next timer cancel_timer(StateData#state.timer), - Timer = if - Pause > 0 -> - erlang:start_timer( - Pause*1000, self(), []); - true -> - erlang:start_timer( - StateData#state.max_inactivity, self(), []) - end, + Timer = set_inactivity_timer(Pause, + StateData#state.max_inactivity), case StateData#state.waiting_input of false -> - Input = Payload ++ [StateData#state.input], + Input = + lists:foldl( + fun queue:in/2, + StateData#state.input, Payload), Reply = ok, process_buffered_request(Reply, StateName, StateData#state{input = Input, @@ -675,32 +682,42 @@ process_http_put({http_put, Rid, Attrs, Payload, Hold, StreamTo, IP}, req_list = ReqList, ip = IP }); - {Receiver, _Tag} -> - SendPacket = - case StreamTo of - {To, ""} -> - ["<stream:stream to='", To, "' " - "xmlns='"++?NS_CLIENT++"' " - "xmlns:stream='"++?NS_STREAM++"'>"] - ++ Payload; - {To, Version} -> - ["<stream:stream to='", To, "' " - "xmlns='"++?NS_CLIENT++"' " - "version='", Version, "' " - "xmlns:stream='"++?NS_STREAM++"'>"] - ++ Payload; - _ -> - Payload - end, + C2SPid -> + case StreamTo of + {To, ""} -> + gen_fsm:send_event( + C2SPid, + {xmlstreamstart, "stream:stream", + [{"to", To}, + {"xmlns", ?NS_CLIENT}, + {"xmlns:stream", ?NS_STREAM}]}); + {To, Version} -> + gen_fsm:send_event( + C2SPid, + {xmlstreamstart, "stream:stream", + [{"to", To}, + {"xmlns", ?NS_CLIENT}, + {"version", Version}, + {"xmlns:stream", ?NS_STREAM}]}); + _ -> + ok + end, + MaxInactivity = get_max_inactivity(StreamTo, StateData#state.max_inactivity), - ?DEBUG("really sending now: ~s", [SendPacket]), - Receiver ! {tcp, StateData#state.socket, - list_to_binary(SendPacket)}, + MaxPause = get_max_inactivity(StreamTo, StateData#state.max_pause), + + ?DEBUG("really sending now: ~p", [Payload]), + lists:foreach( + fun({xmlstreamend, End}) -> + gen_fsm:send_event( + C2SPid, {xmlstreamend, End}); + (El) -> + gen_fsm:send_event( + C2SPid, {xmlstreamelement, El}) + end, Payload), Reply = ok, process_buffered_request(Reply, StateName, - StateData#state{waiting_input = false, - last_receiver = Receiver, - input = "", + StateData#state{input = queue:new(), rid = Rid, key = SaveKey, ctime = TNow, @@ -709,6 +726,7 @@ process_http_put({http_put, Rid, Attrs, Payload, Hold, StreamTo, IP}, last_poll = LastPoll, req_list = ReqList, max_inactivity = MaxInactivity, + max_pause = MaxPause, ip = IP }) end @@ -724,29 +742,42 @@ process_buffered_request(Reply, StateName, StateData) -> case lists:keysearch(Rid+1, 2, Requests) of {value, Request} -> ?DEBUG("Processing buffered request: ~p", [Request]), - NewRequests = Requests -- [Request], - handle_sync_event(Request, undefined, StateName, - StateData#state{unprocessed_req_list=NewRequests}); + NewRequests = lists:keydelete(Rid+1, 2, Requests), + handle_http_put_event( + Request, StateName, + StateData#state{unprocessed_req_list = NewRequests}); _ -> {reply, Reply, StateName, StateData} end. -handle_http_put(Sid, Rid, Attrs, Payload, StreamStart, IP) -> - case http_put(Sid, Rid, Attrs, Payload, StreamStart, IP) of +handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP) -> + case http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP) of {error, not_exists} -> - ?DEBUG("no session associated with sid: ~p", [Sid]), + ?ERROR_MSG("no session associated with sid: ~p", [Sid]), {404, ?HEADER, ""}; {{error, Reason}, Sess} -> - ?DEBUG("Error on HTTP put. Reason: ~p", [Reason]), + ?ERROR_MSG("Error on HTTP put. Reason: ~p", [Reason]), handle_http_put_error(Reason, Sess); {{repeat, OutPacket}, Sess} -> ?DEBUG("http_put said 'repeat!' ...~nOutPacket: ~p", [OutPacket]), send_outpacket(Sess, OutPacket); + {{wait, Pause}, _Sess} -> + ?DEBUG("Trafic Shaper: Delaying request ~p", [Rid]), + timer:sleep(Pause), + %{200, ?HEADER, + % xml:element_to_string( + % {xmlelement, "body", + % [{"xmlns", ?NS_HTTP_BIND}, + % {"type", "error"}], []})}; + handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize, + StreamStart, IP); + {buffered, _Sess} -> + {200, ?HEADER, "<body xmlns='"++?NS_HTTP_BIND++"'/>"}; {ok, Sess} -> prepare_response(Sess, Rid, Attrs, StreamStart) end. -http_put(Sid, Rid, Attrs, Payload, StreamStart, IP) -> +http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP) -> ?DEBUG("Looking for session: ~p", [Sid]), case mnesia:dirty_read({http_bind, Sid}) of [] -> @@ -760,7 +791,9 @@ http_put(Sid, Rid, Attrs, Payload, StreamStart, IP) -> "" end, {gen_fsm:sync_send_all_state_event( - FsmRef, {http_put, Rid, Attrs, Payload, Hold, NewStream, IP}), Sess} + FsmRef, #http_put{rid = Rid, attrs = Attrs, payload = Payload, + payload_size = PayloadSize, hold = Hold, + stream = NewStream, ip = IP}, 30000), Sess} end. handle_http_put_error(Reason, #http_bind{pid=FsmRef, version=Version}) @@ -803,6 +836,46 @@ handle_http_put_error(Reason, #http_bind{pid=FsmRef}) -> {403, ?HEADER, ""} end. +%% Control RID ordering +rid_allow(none, _NewRid, _Attrs, _Hold, _MaxPause) -> + %% First request - nothing saved so far + {true, 0}; +rid_allow(OldRid, NewRid, Attrs, Hold, MaxPause) -> + ?DEBUG("Previous rid / New rid: ~p/~p", [OldRid, NewRid]), + if + %% We did not miss any packet, we can process it immediately: + NewRid == OldRid + 1 -> + case catch list_to_integer( + xml:get_attr_s("pause", Attrs)) of + {'EXIT', _} -> + {true, 0}; + Pause1 when Pause1 =< MaxPause -> + ?DEBUG("got pause: ~p", [Pause1]), + {true, Pause1}; + _ -> + {true, 0} + end; + %% We have missed packets, we need to cached it to process it later on: + (OldRid < NewRid) and + (NewRid =< (OldRid + Hold + 1)) -> + buffer; + (NewRid =< OldRid) and + (NewRid > OldRid - Hold - 1) -> + repeat; + true -> + false + end. + +update_shaper(ShaperState, PayloadSize) -> + {NewShaperState, Pause} = shaper:update(ShaperState, PayloadSize), + if + Pause > 0 -> + ShaperTimer = erlang:start_timer(Pause, self(), activate), %% MR: Seems timer is not needed. Activate is not handled + {NewShaperState, ShaperTimer}; + true -> + {NewShaperState, undefined} + end. + prepare_response(#http_bind{id=Sid, wait=Wait, hold=Hold, to=To}=Sess, Rid, _, StreamStart) -> receive after 100 -> ok end, %% TODO: Why is this needed. Argh. Bad programming practice. @@ -816,62 +889,52 @@ prepare_response(#http_bind{id=Sid, wait=Wait, hold=Hold, to=To}=Sess, {200, ?HEADER, "<body xmlns='"++?NS_HTTP_BIND++"'/>"}; {ok, terminate} -> {200, ?HEADER, "<body type='terminate' xmlns='"++?NS_HTTP_BIND++"'/>"}; - {ok, OutPacket} -> - ?DEBUG("OutPacket: ~s", [OutPacket]), + {ok, ROutPacket} -> + OutPacket = lists:reverse(ROutPacket), + ?DEBUG("OutPacket: ~p", [OutPacket]), case StreamStart of false -> send_outpacket(Sess, OutPacket); true -> - OutEls = - case xml_stream:parse_element( - OutPacket++"</stream:stream>") of - El when element(1, El) == xmlelement -> - ?DEBUG("~p", [El]), - {xmlelement, _, OutAttrs, Els} = El, - AuthID = xml:get_attr_s("id", OutAttrs), - From = xml:get_attr_s("from", OutAttrs), - Version = xml:get_attr_s("version", OutAttrs), - StreamError = false, - case Els of - [] -> - []; - [{xmlelement, "stream:features", - StreamAttribs, StreamEls} - | StreamTail] -> - [{xmlelement, "stream:features", - [{"xmlns:stream", - ?NS_STREAM} - ] - ++ StreamAttribs, - StreamEls - }] ++ StreamTail; - Xml -> - Xml - end; - {error, _} -> - AuthID = "", - From = "", - Version = "", - StreamError = true, - [] - end, - if - StreamError == true -> - {200, ?HEADER, "<body type='terminate' " - "condition='host-unknown' " - "xmlns='"++?NS_HTTP_BIND++"'/>"}; - true -> - BOSH_attribs = - [{"authid", AuthID}, - {"xmlns:xmpp", ?NS_BOSH}, - {"xmlns:stream", ?NS_STREAM}] ++ - case OutEls of - [] -> - []; - _ -> - [{"xmpp:version", Version}] - end, - MaxInactivity = get_max_inactivity(To, ?MAX_INACTIVITY), + case OutPacket of + [{xmlstreamstart, _, OutAttrs} | Els] -> + AuthID = xml:get_attr_s("id", OutAttrs), + From = xml:get_attr_s("from", OutAttrs), + Version = xml:get_attr_s("version", OutAttrs), + OutEls = + case Els of + [] -> + []; + [{xmlstreamelement, + {xmlelement, "stream:features", + StreamAttribs, StreamEls}} + | StreamTail] -> + TypedTail = + [check_default_xmlns(OEl) || + {xmlstreamelement, OEl} <- + StreamTail], + [{xmlelement, "stream:features", + [{"xmlns:stream", + ?NS_STREAM}] ++ + StreamAttribs, StreamEls}] ++ + TypedTail; + StreamTail -> + [check_default_xmlns(OEl) || + {xmlstreamelement, OEl} <- + StreamTail] + end, + BOSH_attribs = + [{"authid", AuthID}, + {"xmlns:xmpp", ?NS_BOSH}, + {"xmlns:stream", ?NS_STREAM}] ++ + case OutEls of + [] -> + []; + _ -> + [{"xmpp:version", Version}] + end, + MaxInactivity = get_max_inactivity(To, ?MAX_INACTIVITY), + MaxPause = get_max_pause(To), {200, ?HEADER, xml:element_to_string( {xmlelement,"body", @@ -882,16 +945,20 @@ prepare_response(#http_bind{id=Sid, wait=Wait, hold=Hold, to=To}=Sess, {"requests", integer_to_list(Hold+1)}, {"inactivity", integer_to_list( - trunc(MaxInactivity/1000))}, - {"maxpause", - integer_to_list(?MAX_PAUSE)}, + trunc(MaxInactivity/1000))}, + {"maxpause", + integer_to_list(MaxPause)}, {"polling", - integer_to_list( - trunc(?MIN_POLLING/1000000))}, - {"ver", ?BOSH_VERSION}, - {"from", From}, - {"secure", "true"} %% we're always being secure - ] ++ BOSH_attribs,OutEls})} + integer_to_list( + trunc(?MIN_POLLING/1000000))}, + {"ver", ?BOSH_VERSION}, + {"from", From}, + {"secure", "true"} %% we're always being secure + ] ++ BOSH_attribs,OutEls})}; + {error, _} -> + {200, ?HEADER, "<body type='terminate' " + "condition='host-unknown' " + "xmlns='"++?NS_HTTP_BIND++"'/>"} end end; {'EXIT', {shutdown, _}} -> @@ -906,77 +973,85 @@ http_get(#http_bind{pid = FsmRef, wait = Wait, hold = Hold}, Rid) -> send_outpacket(#http_bind{pid = FsmRef}, OutPacket) -> case OutPacket of - "" -> + [] -> {200, ?HEADER, "<body xmlns='"++?NS_HTTP_BIND++"'/>"}; - "</stream:stream>" -> + [{xmlstreamend, _}] -> gen_fsm:sync_send_all_state_event(FsmRef,{stop,stream_closed}), {200, ?HEADER, "<body xmlns='"++?NS_HTTP_BIND++"'/>"}; _ -> - case xml_stream:parse_element("<body>" - ++ OutPacket - ++ "</body>") - of - El when element(1, El) == xmlelement -> - {xmlelement, _, _, OEls} = El, + %% TODO: We parse to add a default namespace to packet, + %% The spec says adding the jabber:client namespace if + %% mandatory, even if some implementation do not do that + %% change on packets. + %% I think this should be an option to avoid modifying + %% packet in most case. + AllElements = + lists:all(fun({xmlstreamelement, + {xmlelement, "stream:error", _, _}}) -> false; + ({xmlstreamelement, _}) -> true; + (_) -> false + end, OutPacket), + case AllElements of + true -> TypedEls = [check_default_xmlns(OEl) || - OEl <- OEls], + {xmlstreamelement, OEl} <- OutPacket], + Body = xml:element_to_string( + {xmlelement,"body", + [{"xmlns", + ?NS_HTTP_BIND}], + TypedEls}), ?DEBUG(" --- outgoing data --- ~n~s~n --- END --- ~n", - [xml:element_to_string( - {xmlelement,"body", - [{"xmlns", - ?NS_HTTP_BIND}], - TypedEls})] - ), - {200, ?HEADER, - xml:element_to_string( - {xmlelement,"body", - [{"xmlns", - ?NS_HTTP_BIND}], - TypedEls})}; - {error, _E} -> - OutEls = case xml_stream:parse_element( - OutPacket++"</stream:stream>") of - SEl when element(1, SEl) == xmlelement -> - {xmlelement, _, _OutAttrs, SEls} = SEl, - StreamError = false, - case SEls of - [] -> - []; - [{xmlelement, - "stream:features", - StreamAttribs, StreamEls} | - StreamTail] -> - TypedTail = - [check_default_xmlns(OEl) || - OEl <- StreamTail], - [{xmlelement, - "stream:features", - [{"xmlns:stream", - ?NS_STREAM}] ++ - StreamAttribs, StreamEls}] ++ - TypedTail; - Xml -> - Xml - end; - {error, _} -> - StreamError = true, - [] - end, - if - StreamError -> + [Body]), + {200, ?HEADER, Body}; + false -> + case OutPacket of + [{xmlstreamstart, _, _} | SEls] -> + OutEls = + case SEls of + [{xmlstreamelement, + {xmlelement, + "stream:features", + StreamAttribs, StreamEls}} | + StreamTail] -> + TypedTail = + [check_default_xmlns(OEl) || + {xmlstreamelement, OEl} <- + StreamTail], + [{xmlelement, + "stream:features", + [{"xmlns:stream", + ?NS_STREAM}] ++ + StreamAttribs, StreamEls}] ++ + TypedTail; + StreamTail -> + [check_default_xmlns(OEl) || + {xmlstreamelement, OEl} <- + StreamTail] + end, + {200, ?HEADER, + xml:element_to_string( + {xmlelement,"body", + [{"xmlns", + ?NS_HTTP_BIND}], + OutEls})}; + _ -> + SErrCond = + lists:filter( + fun({xmlstreamelement, + {xmlelement, "stream:error", + _, _}}) -> + true; + (_) -> false + end, OutPacket), StreamErrCond = - case xml_stream:parse_element( - "<stream:stream>" ++ OutPacket) of - El when element(1, El) == xmlelement -> - case xml:get_subtag(El, "stream:error") of - false -> - null; - {xmlelement, _, _, _Cond} = StreamErrorTag -> - [StreamErrorTag] - end; - {error, _E} -> - null - end, + case SErrCond of + [] -> + null; + [{xmlstreamelement, + {xmlelement, _, _, _Cond} = + StreamErrorTag} | _] -> + [StreamErrorTag] + end, gen_fsm:sync_send_all_state_event(FsmRef, {stop, {stream_error,OutPacket}}), case StreamErrCond of @@ -993,27 +1068,22 @@ send_outpacket(#http_bind{pid = FsmRef}, OutPacket) -> "xmlns:stream='"++?NS_STREAM++"'>" ++ elements_to_string(StreamErrCond) ++ "</body>"} - end; - true -> - {200, ?HEADER, - xml:element_to_string( - {xmlelement,"body", - [{"xmlns", - ?NS_HTTP_BIND}], - OutEls})} - end + end + end end end. -parse_request(Data) -> +parse_request(_Data, PayloadSize, MaxStanzaSize) + when PayloadSize > MaxStanzaSize -> + {error, size_limit}; +parse_request(Data, _PayloadSize, _MaxStanzaSize) -> ?DEBUG("--- incoming data --- ~n~s~n --- END --- ", [Data]), + %% MR: I do not think it works if put put several elements in the + %% same body: case xml_stream:parse_element(Data) of - El when element(1, El) == xmlelement -> - {xmlelement, Name, Attrs, Els} = El, + {xmlelement, "body", Attrs, Els} -> Xmlns = xml:get_attr_s("xmlns",Attrs), if - Name /= "body" -> - {error, bad_request}; Xmlns /= ?NS_HTTP_BIND -> {error, bad_request}; true -> @@ -1021,7 +1091,7 @@ parse_request(Data) -> {'EXIT', _} -> {error, bad_request}; Rid -> - %% I guess this is to remove XMLCDATA: + %% I guess this is to remove XMLCDATA: Is it really needed ? FixedEls = lists:filter( fun(I) -> @@ -1032,28 +1102,22 @@ parse_request(Data) -> false end end, Els), - %% MR: I commented this code, because it is not used. -%% lists:map( -%% fun(E) -> -%% EXmlns = xml:get_tag_attr_s("xmlns",E), -%% if -%% EXmlns == ?NS_CLIENT -> -%% remove_tag_attr("xmlns",E); -%% true -> -%% ok -%% end -%% end, FixedEls), - Payload = [xml:element_to_string(E) || E <- FixedEls], Sid = xml:get_attr_s("sid",Attrs), - %% MR: I do not think we need to extract - %% Sid. We should have it somewhere else: - {ok, {Sid, Rid, Attrs, Payload}} + {ok, {Sid, Rid, Attrs, FixedEls}} end end; + {xmlelement, _Name, _Attrs, _Els} -> + {error, bad_request}; {error, _Reason} -> {error, bad_request} end. +send_receiver_reply(undefined, _Reply) -> + ok; +send_receiver_reply(Receiver, Reply) -> + gen_fsm:reply(Receiver, Reply). + + %% Cancel timer and empty message queue. cancel_timer(undefined) -> ok; @@ -1066,6 +1130,15 @@ cancel_timer(Timer) -> ok end. +%% If client asked for a pause (pause > 0), we apply the pause value +%% as inactivity timer: +set_inactivity_timer(Pause, _MaxInactivity) when Pause > 0 -> + erlang:start_timer(Pause*1000, self(), []); +%% Otherwise, we apply the max_inactivity value as inactivity timer: +set_inactivity_timer(_Pause, MaxInactivity) -> + erlang:start_timer(MaxInactivity, self(), []). + + %% TODO: Use tail recursion and list reverse ? elements_to_string([]) -> []; @@ -1084,11 +1157,15 @@ get_max_inactivity({Host, _}, Default) -> get_max_inactivity(_, Default) -> Default. -%% remove_tag_attr(Attr, {xmlelement, Name, Attrs, Els}) -> -%% Attrs1 = lists:keydelete(Attr, 1, Attrs), -%% {xmlelement, Name, Attrs1, Els}; -%% remove_tag_attr(Attr, El) -> -%% El. +get_max_pause({Host, _}) -> + gen_mod:get_module_opt(Host, mod_http_bind, max_pause, ?MAX_PAUSE); +get_max_pause(_) -> + ?MAX_PAUSE. + +%% Current time as integer +tnow() -> + {TMegSec, TSec, TMSec} = now(), + (TMegSec * 1000000 + TSec) * 1000000 + TMSec. check_default_xmlns({xmlelement, Name, Attrs, Els} = El) -> case xml:get_tag_attr_s("xmlns", El) of @@ -1101,6 +1178,6 @@ check_default_xmlns({xmlelement, Name, Attrs, Els} = El) -> check_bind_module(XmppDomain) -> case gen_mod:is_loaded(XmppDomain, mod_http_bind) of true -> ok; - false -> ?ERROR_MSG("You are trying to use HTTP Bind (BOSH), but the module mod_http_bind is not started.~n" + false -> ?ERROR_MSG("You are trying to use BOSH (HTTP Bind), but the module mod_http_bind is not started.~n" "Check your 'modules' section in your ejabberd configuration file.",[]) end. |