summaryrefslogblamecommitdiff
path: root/src/ejabberd_http_bind.erl
blob: 758c1cee521faeaa791e16eb7d6dc271cc409277 (plain) (tree)
1
2
3
4
5
6
7
8
9


                                                                         
                                                  
                                                                       
                                                        

   
                                                  














                                                                           


                                                                         
 


                              


                    
                      






                             
                    
                    
                    
                   
                               




                             
                 
                 
                                
                           


                            
                             
 
                         
                       
 
                     
 
                              
 
                          
 

                                                           
 

                                      
                       

                  
                    

                   
                           
                    
                       
                            
                                    
                                      

                             

                              
                              
                                              
                           

                          
                          
                                                                                                                         
                                                         
                               
                          
                               

                  







                                

                        
               
 
                                     
 
      
 
                     
 

       

                                                                                           
 















                                                   
 
                                    
 
                              
 

                                 
                                              
 
                                         
                                   


                                                  

                               


                                                      
 

                                                                 
 
                                         

                                                      
 
                                             

                                                          
 
                                          

                                              
                                                                     



              
                                         
 



                                            

                                                              




                                                  

                                                          



                                    
                                  

                                                           
 
                                      
 
                                               
 

                                                                        
                                   

                                                 





                                                            
                                    




                                               
                                                  






                                                                        
                                                       
                                                                                                
                                                                  






                                                                        
                                                                             




                                                                    
                                                                




                                                  
                                                               



















                                                                             

        

                                                      
                                 
                                                          






                                                     
               
                                                          






                                                                       
               
                 
                                                                  









                                                                       
                 
                        
                                                                                     



                                   
                                                             
                                        









                                        
                                                                     
 



                                                                         
                              
                                            
                                                 
                                                                

                                                             
                                                                 




                                                                

                                     

                                                               
                                                            
                               
                                
                                      
                                                 
                                                       
                                             

                                      
                                                                  
                                 








                                                          
        

                                                
                                        

                                                     


                                             



                                                                   

                                       



                                                                      



                                                       







                                                                 



                                                                         

                             

                                                                
                                     
 
                                                                

                                     



                                                                        
                                                                                 

                                     
                                            








                                                             


                                                        


                                              
                                                 
                                         
                                      

                                                   


                                                                          
                                       
                                                                             
                                                     
                     
                                                                           
                                                               
                                        


                                                                            
                                                                    
                                                
                                                    
                                                           
                                                                                        
                                                             
                                                          
               
                                                
                                                 

                                                      





















                                                                                    
        

                                             

                                         


                                                     



                                                     
                                            



                                                           
                              

                                                          















                                                                                
        



                                                                       


                                       
                                            

                                            
                                                         

                                                      
                                         

                                                  






                                                                         
                        


                                                             
                                              


                                                          
                    





















                                                                       

        



                                                                             
                                                   
                                                         

                                                  






                                                
                                                     






                                                                           
                                                     

                                        
                   
























































































































                                                                                          




                                                        







                                                                      

        





















                                                                

        

                                               
                                             
                                               
             















                                                       

        




                                                                   
                  

                        
                                                           







                                                                           
                                                           







                                                                           
                                                           





                                                                           
        



                                                                              
                  










                                                             

        

                                                     

                                                   

                                           
      


                                                                   
                                                                    












                                                                             


                                          






                                                          

        
                                                      
                                                          
                                     
























                                                                          
        
 
                                              

                                                 
                                                 





                                                           

                                                     



                                                                          
                     
                                              


                                                             



































                                                                                
                                                                 





























                                                                                                         

        





                                                                  
 
                                                      
                     





















                                                                              
                                                                                                   

































                                                                               
                                                                       




























                                                                                       
                            









                                                                              
        
 
                                                  


                                                  
                                           

                                              
                                                      


                                                              
                                                                         










                                                                          
                                                               







                                                                

        
                                             


                                       
                                        
                              

                               
                                                        
 

                                                                    


                                                 



                                                                    
                             
                                 
                                                           
 


                                                      





                                                                        
        
                                          
 
                           



                                                                   
 


                                                      
                                                




                                                             
        
                              
 

                                                            

                                                        











                                                                  
        
%%%----------------------------------------------------------------------
%%% File    : ejabberd_http_bind.erl
%%% Author  : Stefan Strigler <steve@zeank.in-berlin.de>
%%% Purpose : Implements XMPP over BOSH (XEP-0206)
%%% Created : 21 Sep 2005 by Stefan Strigler <steve@zeank.in-berlin.de>
%%% Modified: may 2009 by Mickael Remond, Alexey Schepin
%%%
%%%
%%% ejabberd, Copyright (C) 2002-2016   ProcessOne
%%%
%%% This program is free software; you can redistribute it and/or
%%% modify it under the terms of the GNU General Public License as
%%% published by the Free Software Foundation; either version 2 of the
%%% License, or (at your option) any later version.
%%%
%%% This program is distributed in the hope that it will be useful,
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
%%% General Public License for more details.
%%%
%%% You should have received a copy of the GNU General Public License along
%%% with this program; if not, write to the Free Software Foundation, Inc.,
%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
%%%
%%%----------------------------------------------------------------------

-module(ejabberd_http_bind).

-protocol({xep, 124, '1.11'}).
-protocol({xep, 206, '1.4'}).

-behaviour(gen_fsm).

%% External exports
-export([start_link/4,
	 init/1,
	 handle_event/3,
	 handle_sync_event/4,
	 code_change/4,
	 handle_info/3,
	 terminate/3,
	 send/2,
	 send_xml/2,
	 sockname/1,
	 peername/1,
	 setopts/2,
	 controlling_process/2,
	 become_controller/2,
	 custom_receiver/1,
	 reset_stream/1,
	 change_shaper/2,
	 monitor/1,
	 close/1,
	 start/5,
	 handle_session_start/8,
	 handle_http_put/7,
	 http_put/7,
	 http_get/2,
	 prepare_response/4,
	 process_request/3]).

-include("ejabberd.hrl").
-include("logger.hrl").

-include("jlib.hrl").

-include("ejabberd_http.hrl").

-include("http_bind.hrl").

-record(http_bind,
	{id, pid, to, hold, wait, process_delay, version}).

-define(NULL_PEER, {{0, 0, 0, 0}, 0}).

%% http binding request
-record(hbr, {rid,
	      key,
	      out}).

-record(state, {id,
		rid = none,
		key,
		socket,
		output = "",
		input = queue:new(),
		waiting_input = false,
		shaper_state,
		shaper_timer,
		last_receiver,
		last_poll,
		http_receiver,
		out_of_order_receiver = false,
		wait_timer,
		ctime = 0,
		timer,
		pause = 0,
		unprocessed_req_list = [], % list of request that have been delayed for proper reordering: {Request, PID}
		req_list = [], % list of requests (cache)
		max_inactivity,
		max_pause,
		ip = ?NULL_PEER
	       }).

%% Internal request format:
-record(http_put, {rid,
		   attrs,
		   payload,
		   payload_size,
		   hold,
		   stream,
		   ip}).

%%-define(DBGFSM, true).
-ifdef(DBGFSM).

-define(FSMOPTS, [{debug, [trace]}]).

-else.

-define(FSMOPTS, []).

-endif.

%% Wait 100ms before continue processing, to allow the client provide more related stanzas.
-define(BOSH_VERSION, <<"1.8">>).

-define(NS_CLIENT, <<"jabber:client">>).

-define(NS_BOSH, <<"urn:xmpp:xbosh">>).

-define(NS_HTTP_BIND,
	<<"http://jabber.org/protocol/httpbind">>).

-define(MAX_REQUESTS, 2).

-define(MIN_POLLING, 2000000).

-define(MAX_WAIT, 3600).

-define(MAX_INACTIVITY, 30000).

-define(MAX_PAUSE, 120).

-define(PROCESS_DELAY_DEFAULT, 100).

-define(PROCESS_DELAY_MIN, 0).

-define(PROCESS_DELAY_MAX, 1000).

-define(PROCNAME_MHB, ejabberd_mod_http_bind).

start(XMPPDomain, Sid, Key, IP, HOpts) ->
    ?DEBUG("Starting session", []),
    case catch gen_fsm:start(?MODULE,
			    [Sid, Key, IP, HOpts],
			    ?FSMOPTS)
    of
	{ok, Pid} -> {ok, Pid};
	_ -> check_bind_module(XMPPDomain),
             {error, "Cannot start HTTP bind session"}
    end.

start_link(Sid, Key, IP, HOpts) ->
    gen_fsm:start_link(?MODULE, [Sid, Key, IP, HOpts], ?FSMOPTS).

send({http_bind, FsmRef, _IP}, Packet) ->
    gen_fsm:sync_send_all_state_event(FsmRef,
				      {send, Packet}).

send_xml({http_bind, FsmRef, _IP}, Packet) ->
    gen_fsm:sync_send_all_state_event(FsmRef,
				      {send_xml, Packet}).

setopts({http_bind, FsmRef, _IP}, Opts) ->
    case lists:member({active, once}, Opts) of
	true ->
	    gen_fsm:send_all_state_event(FsmRef, {activate, self()});
	_ ->
	    ok
    end.

controlling_process(_Socket, _Pid) -> ok.

custom_receiver({http_bind, FsmRef, _IP}) ->
    {receiver, ?MODULE, FsmRef}.

become_controller(FsmRef, C2SPid) ->
    gen_fsm:send_all_state_event(FsmRef,
				 {become_controller, C2SPid}).

reset_stream({http_bind, _FsmRef, _IP}) ->
    ok.

change_shaper({http_bind, FsmRef, _IP}, Shaper) ->
    gen_fsm:send_all_state_event(FsmRef,
				 {change_shaper, Shaper}).

monitor({http_bind, FsmRef, _IP}) ->
    erlang:monitor(process, FsmRef).

close({http_bind, FsmRef, _IP}) ->
    catch gen_fsm:sync_send_all_state_event(FsmRef,
					    {stop, close}).

sockname(_Socket) -> {ok, ?NULL_PEER}.

peername({http_bind, _FsmRef, IP}) -> {ok, IP}.


%% Entry point for data coming from client through ejabberd HTTP server:
process_request(Data, IP, HOpts) ->
    Opts1 = ejabberd_c2s_config:get_c2s_limits(),
    Opts = [{xml_socket, true} | Opts1],
    MaxStanzaSize = case lists:keysearch(max_stanza_size, 1,
					 Opts)
			of
		      {value, {_, Size}} -> Size;
		      _ -> infinity
		    end,
    PayloadSize = iolist_size(Data),
    case catch parse_request(Data, PayloadSize,
			     MaxStanzaSize)
	of
      %% No existing session:
      {ok, {<<"">>, Rid, Attrs, Payload}} ->
	  case fxml:get_attr_s(<<"to">>, Attrs) of
	    <<"">> ->
		?DEBUG("Session not created (Improper addressing)", []),
		{200, ?HEADER,
		 <<"<body type='terminate' condition='improper-ad"
		   "dressing' xmlns='",
		   (?NS_HTTP_BIND)/binary, "'/>">>};
	    XmppDomain ->
                NXmppDomain = jid:nameprep(XmppDomain),
		Sid = p1_sha:sha(term_to_binary({p1_time_compat:monotonic_time(), make_ref()})),
		case start(NXmppDomain, Sid, <<"">>, IP, HOpts) of
		  {error, _} ->
		      {500, ?HEADER,
		       <<"<body type='terminate' condition='internal-se"
			 "rver-error' xmlns='",
			 (?NS_HTTP_BIND)/binary,
			 "'>Internal Server Error</body>">>};
		  {ok, Pid} ->
		      handle_session_start(Pid, NXmppDomain, Sid, Rid, Attrs,
					   Payload, PayloadSize, IP)
		end
	  end;
      %% Existing session
      {ok, {Sid, Rid, Attrs, Payload1}} ->
	  StreamStart = case fxml:get_attr_s(<<"xmpp:restart">>,
					    Attrs)
			    of
			  <<"true">> -> true;
			  _ -> false
			end,
	  Payload2 = case fxml:get_attr_s(<<"type">>, Attrs) of
		       <<"terminate">> ->
			   Payload1 ++ [{xmlstreamend, <<"stream:stream">>}];
		       _ -> Payload1
		     end,
	  handle_http_put(Sid, Rid, Attrs, Payload2, PayloadSize,
			  StreamStart, IP);
      {size_limit, Sid} ->
	  case mnesia:dirty_read({http_bind, Sid}) of
	    {error, _} -> {404, ?HEADER, <<"">>};
	    {ok, #http_bind{pid = FsmRef}} ->
		gen_fsm:sync_send_all_state_event(FsmRef,
						  {stop, close}),
		{200, ?HEADER,
		 <<"<body type='terminate' condition='undefined-c"
		   "ondition' xmlns='",
		   (?NS_HTTP_BIND)/binary, "'>Request Too Large</body>">>}
	  end;
      _ ->
	  ?DEBUG("Received bad request: ~p", [Data]),
	  {400, ?HEADER, <<"">>}
    end.

handle_session_start(Pid, XmppDomain, Sid, Rid, Attrs,
		     Payload, PayloadSize, IP) ->
    ?DEBUG("got pid: ~p", [Pid]),
    Wait = case str:to_integer(fxml:get_attr_s(<<"wait">>,
					      Attrs))
	       of
	     {error, _} -> ?MAX_WAIT;
	     {CWait, _} ->
		 if CWait > (?MAX_WAIT) -> ?MAX_WAIT;
		    true -> CWait
		 end
	   end,
    Hold = case str:to_integer(fxml:get_attr_s(<<"hold">>,
					      Attrs))
	       of
	     {error, _} -> (?MAX_REQUESTS) - 1;
	     {CHold, _} ->
		 if CHold > (?MAX_REQUESTS) - 1 -> (?MAX_REQUESTS) - 1;
		    true -> CHold
		 end
	   end,
    Pdelay = case
	       str:to_integer(fxml:get_attr_s(<<"process-delay">>,
					     Attrs))
		 of
	       {error, _} -> ?PROCESS_DELAY_DEFAULT;
	       {CPdelay, _}
		   when ((?PROCESS_DELAY_MIN) =< CPdelay) and
			  (CPdelay =< (?PROCESS_DELAY_MAX)) ->
		   CPdelay;
	       {CPdelay, _} ->
		   lists:max([lists:min([CPdelay, ?PROCESS_DELAY_MAX]),
			      ?PROCESS_DELAY_MIN])
	     end,
    Version = case catch
		     list_to_float(binary_to_list(fxml:get_attr_s(<<"ver">>, Attrs)))
		  of
		{'EXIT', _} -> 0.0;
		V -> V
	      end,
    XmppVersion = fxml:get_attr_s(<<"xmpp:version">>, Attrs),
    ?DEBUG("Create session: ~p", [Sid]),
    mnesia:dirty_write(
      #http_bind{id = Sid,
                 pid = Pid,
                 to = {XmppDomain,
                       XmppVersion},
                 hold = Hold,
                 wait = Wait,
                 process_delay = Pdelay,
                 version = Version
                }),
    handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize, true, IP).

%%%----------------------------------------------------------------------
%%% Callback functions from gen_fsm
%%%----------------------------------------------------------------------

init([Sid, Key, IP, HOpts]) ->
    ?DEBUG("started: ~p", [{Sid, Key, IP}]),
    Opts1 = ejabberd_c2s_config:get_c2s_limits(),
    SOpts = lists:filtermap(fun({stream_management, _}) -> true;
                               ({max_ack_queue, _}) -> true;
                               ({resume_timeout, _}) -> true;
                               ({max_resume_timeout, _}) -> true;
                               ({resend_on_timeout, _}) -> true;
                               (_) -> false
                            end, HOpts),

    Opts = [{xml_socket, true} | SOpts ++ Opts1],
    Shaper = none,
    ShaperState = shaper:new(Shaper),
    Socket = {http_bind, self(), IP},
    ejabberd_socket:start(ejabberd_c2s, ?MODULE, Socket, Opts),
    Timer = erlang:start_timer(?MAX_INACTIVITY, self(), []),
    {ok, loop, #state{id = Sid,
		      key = Key,
		      socket = Socket,
		      shaper_state = ShaperState,
		      max_inactivity = ?MAX_INACTIVITY,
		      max_pause = ?MAX_PAUSE,
		      timer = Timer}}.

handle_event({become_controller, C2SPid}, StateName, StateData) ->
    case StateData#state.input of
      cancel ->
	  {next_state, StateName,
	   StateData#state{waiting_input = C2SPid}};
      Input ->
	  lists:foreach(fun (Event) -> C2SPid ! Event end,
			queue:to_list(Input)),
	  {next_state, StateName,
	   StateData#state{input = queue:new(),
			   waiting_input = C2SPid}}
    end;
handle_event({change_shaper, Shaper}, StateName,
	     StateData) ->
    NewShaperState = shaper:new(Shaper),
    {next_state, StateName,
     StateData#state{shaper_state = NewShaperState}};
handle_event(_Event, StateName, StateData) ->
    {next_state, StateName, StateData}.

handle_sync_event({send_xml, Packet}, _From, StateName,
		  #state{http_receiver = undefined} = StateData) ->
    Output = [Packet | StateData#state.output],
    Reply = ok,
    {reply, Reply, StateName,
     StateData#state{output = Output}};
handle_sync_event({send_xml, Packet}, _From, StateName,
		  #state{out_of_order_receiver = true} = StateData) ->
    Output = [Packet | StateData#state.output],
    Reply = ok,
    {reply, Reply, StateName,
     StateData#state{output = Output}};
handle_sync_event({send_xml, Packet}, _From, StateName,
		  StateData) ->
    Output = [Packet | StateData#state.output],
    cancel_timer(StateData#state.timer),
    Timer = set_inactivity_timer(StateData#state.pause,
				 StateData#state.max_inactivity),
    HTTPReply = {ok, Output},
    gen_fsm:reply(StateData#state.http_receiver, HTTPReply),
    cancel_timer(StateData#state.wait_timer),
    Rid = StateData#state.rid,
    ReqList = [#hbr{rid = Rid, key = StateData#state.key,
		    out = Output}
	       | [El
		  || El <- StateData#state.req_list, El#hbr.rid /= Rid]],
    Reply = ok,
    {reply, Reply, StateName,
     StateData#state{output = [], http_receiver = undefined,
		     req_list = ReqList, wait_timer = undefined,
		     timer = Timer}};

handle_sync_event({stop,close}, _From, _StateName, StateData) ->
    Reply = ok,
    {stop, normal, Reply, StateData};
handle_sync_event({stop,stream_closed}, _From, _StateName, StateData) ->
    Reply = ok,
    {stop, normal, Reply, StateData};
handle_sync_event({stop,Reason}, _From, _StateName, StateData) ->
    ?DEBUG("Closing bind session ~p - Reason: ~p", [StateData#state.id, Reason]),
    Reply = ok,
    {stop, normal, Reply, StateData};
%% HTTP PUT: Receive packets from the client
handle_sync_event(#http_put{rid = Rid}, _From,
		  StateName, StateData)
    when StateData#state.shaper_timer /= undefined ->
    Pause = case
	      erlang:read_timer(StateData#state.shaper_timer)
		of
	      false -> 0;
	      P -> P
	    end,
    Reply = {wait, Pause},
    ?DEBUG("Shaper timer for RID ~p: ~p", [Rid, Reply]),
    {reply, Reply, StateName, StateData};
handle_sync_event(#http_put{payload_size =
				PayloadSize} =
		      Request,
		  _From, StateName, StateData) ->
    ?DEBUG("New request: ~p", [Request]),
    {NewShaperState, NewShaperTimer} =
	update_shaper(StateData#state.shaper_state,
		      PayloadSize),
    handle_http_put_event(Request, StateName,
			  StateData#state{shaper_state = NewShaperState,
					  shaper_timer = NewShaperTimer});
%% HTTP GET: send packets to the client
handle_sync_event({http_get, Rid, Wait, Hold}, From, StateName, StateData) ->
    TNow = p1_time_compat:system_time(micro_seconds),
    if (Hold > 0) and
	((StateData#state.output == []) or (StateData#state.rid < Rid)) and
	((TNow - StateData#state.ctime) < (Wait*1000*1000)) and
	(StateData#state.rid =< Rid) and
	(StateData#state.pause == 0) ->
	    send_receiver_reply(StateData#state.http_receiver, {ok, empty}),
	    cancel_timer(StateData#state.wait_timer),
	    WaitTimer = erlang:start_timer(Wait * 1000, self(), []),
	    cancel_timer(StateData#state.timer),
	    {next_state, StateName, StateData#state{
				      http_receiver = From,
				      out_of_order_receiver = StateData#state.rid < Rid,
				      wait_timer = WaitTimer,
				      timer = undefined}};
	true ->
	    cancel_timer(StateData#state.timer),
	    Reply = {ok, StateData#state.output},
	    ReqList = [#hbr{rid = Rid,
			    key = StateData#state.key,
			    out = StateData#state.output}
		      | [El
			 || El <- StateData#state.req_list,
			    El#hbr.rid /= Rid]],
	    if (StateData#state.http_receiver /= undefined) and
			StateData#state.out_of_order_receiver ->
		    {reply, Reply, StateName,
			StateData#state{output = [], timer = undefined,
			    req_list = ReqList,
			    out_of_order_receiver = false}};
		true ->
		    send_receiver_reply(StateData#state.http_receiver, {ok, empty}),
		    cancel_timer(StateData#state.wait_timer),
		    Timer = set_inactivity_timer(StateData#state.pause,
			    StateData#state.max_inactivity),
		    {reply, Reply, StateName,
			StateData#state{output = [],
			    http_receiver = undefined,
			    wait_timer = undefined,
			    timer = Timer,
			    req_list = ReqList}}
	    end
    end;
handle_sync_event(peername, _From, StateName,
		  StateData) ->
    Reply = {ok, StateData#state.ip},
    {reply, Reply, StateName, StateData};
handle_sync_event(_Event, _From, StateName,
		  StateData) ->
    Reply = ok, {reply, Reply, StateName, StateData}.

code_change(_OldVsn, StateName, StateData, _Extra) ->
    {ok, StateName, StateData}.

handle_info({timeout, Timer, _}, _StateName,
	    #state{id = SID, timer = Timer} = StateData) ->
    ?INFO_MSG("Session timeout. Closing the HTTP bind "
	      "session: ~p",
	      [SID]),
    {stop, normal, StateData};
handle_info({timeout, WaitTimer, _}, StateName,
	    #state{wait_timer = WaitTimer} = StateData) ->
    if StateData#state.http_receiver /= undefined ->
	   cancel_timer(StateData#state.timer),
	   Timer = set_inactivity_timer(StateData#state.pause,
					StateData#state.max_inactivity),
	   gen_fsm:reply(StateData#state.http_receiver,
			 {ok, empty}),
	   Rid = StateData#state.rid,
	   ReqList = [#hbr{rid = Rid, key = StateData#state.key,
			   out = []}
		      | [El
			 || El <- StateData#state.req_list, El#hbr.rid /= Rid]],
	   {next_state, StateName,
	    StateData#state{http_receiver = undefined,
			    req_list = ReqList, wait_timer = undefined,
			    timer = Timer}};
       true -> {next_state, StateName, StateData}
    end;
handle_info({timeout, ShaperTimer, _}, StateName,
	    #state{shaper_timer = ShaperTimer} = StateData) ->
    {next_state, StateName, StateData#state{shaper_timer = undefined}};

handle_info(_, StateName, StateData) ->
    {next_state, StateName, StateData}.

terminate(_Reason, _StateName, StateData) ->
    ?DEBUG("terminate: Deleting session ~s",
	   [StateData#state.id]),
    mnesia:dirty_delete({http_bind, StateData#state.id}),
    send_receiver_reply(StateData#state.http_receiver,
			{ok, terminate}),
    case StateData#state.waiting_input of
      false -> ok;
      C2SPid -> gen_fsm:send_event(C2SPid, closed)
    end,
    ok.

%%%----------------------------------------------------------------------
%%% Internal functions
%%%----------------------------------------------------------------------

%% PUT / Get processing:
handle_http_put_event(#http_put{rid = Rid,
				attrs = Attrs, hold = Hold} =
			  Request,
		      StateName, StateData) ->
    ?DEBUG("New request: ~p", [Request]),
    RidAllow = rid_allow(StateData#state.rid, Rid, Attrs,
			 Hold, StateData#state.max_pause),
    case RidAllow of
      buffer ->
	  ?DEBUG("Buffered request: ~p", [Request]),
	  PendingRequests = StateData#state.unprocessed_req_list,
	  Requests = lists:keydelete(Rid, 2, PendingRequests),
	  ReqList = [#hbr{rid = Rid, key = StateData#state.key,
			  out = []}
		     | [El
			|| El <- StateData#state.req_list,
			   El#hbr.rid > Rid - 1 - Hold]],
	  ?DEBUG("reqlist: ~p", [ReqList]),
	  UnprocessedReqList = [Request | Requests],
	  cancel_timer(StateData#state.timer),
	  Timer = set_inactivity_timer(0,
				       StateData#state.max_inactivity),
	  {reply, ok, StateName,
	   StateData#state{unprocessed_req_list =
			       UnprocessedReqList,
			   req_list = ReqList, timer = Timer},
	   hibernate};
      _ ->
	  process_http_put(Request, StateName, StateData,
			   RidAllow)
    end.

process_http_put(#http_put{rid = Rid, attrs = Attrs,
			   payload = Payload, hold = Hold, stream = StreamTo,
			   ip = IP} =
		     Request,
		 StateName, StateData, RidAllow) ->
    ?DEBUG("Actually processing request: ~p", [Request]),
    Key = fxml:get_attr_s(<<"key">>, Attrs),
    NewKey = fxml:get_attr_s(<<"newkey">>, Attrs),
    KeyAllow = case RidAllow of
		 repeat -> true;
		 false -> false;
		 {true, _} ->
		     case StateData#state.key of
		       <<"">> -> true;
		       OldKey ->
			   NextKey = p1_sha:sha(Key),
			   ?DEBUG("Key/OldKey/NextKey: ~s/~s/~s",
				  [Key, OldKey, NextKey]),
			   if OldKey == NextKey -> true;
			      true -> ?DEBUG("wrong key: ~s", [Key]), false
			   end
		     end
	       end,
    TNow = p1_time_compat:system_time(micro_seconds),
    LastPoll = if Payload == [] -> TNow;
		  true -> 0
	       end,
    if (Payload == []) and (Hold == 0) and
	 (TNow - StateData#state.last_poll < (?MIN_POLLING)) ->
	   Reply = {error, polling_too_frequently},
	   {reply, Reply, StateName, StateData};
       KeyAllow ->
	   case RidAllow of
	     false ->
		 Reply = {error, not_exists},
		 {reply, Reply, StateName, StateData};
	     repeat ->
		 ?DEBUG("REPEATING ~p", [Rid]),
		 case [El#hbr.out
		       || El <- StateData#state.req_list, El#hbr.rid == Rid]
		     of
		   [] -> {error, not_exists};
		   [Out | _XS] ->
		       if (Rid == StateData#state.rid) and
			    (StateData#state.http_receiver /= undefined) ->
			      {reply, ok, StateName, StateData};
			  true ->
			      Reply = {repeat, lists:reverse(Out)},
			      {reply, Reply, StateName,
			       StateData#state{last_poll = LastPoll}}
		       end
		 end;
	     {true, Pause} ->
		 SaveKey = if NewKey == <<"">> -> Key;
			      true -> NewKey
			   end,
		 ?DEBUG(" -- SaveKey: ~s~n", [SaveKey]),
		 ReqList1 = [El
			     || El <- StateData#state.req_list,
				El#hbr.rid > Rid - 1 - Hold],
		 ReqList = case lists:keymember(Rid, #hbr.rid, ReqList1)
			       of
			     true -> ReqList1;
			     false ->
				 [#hbr{rid = Rid, key = StateData#state.key,
				       out = []}
				  | ReqList1]
			   end,
		 ?DEBUG("reqlist: ~p", [ReqList]),
		 cancel_timer(StateData#state.timer),
		 Timer = set_inactivity_timer(Pause,
					      StateData#state.max_inactivity),
		 case StateData#state.waiting_input of
		   false ->
		       Input = lists:foldl(fun queue:in/2,
					   StateData#state.input, Payload),
		       Reply = ok,
		       process_buffered_request(Reply, StateName,
						StateData#state{input = Input,
								rid = Rid,
								key = SaveKey,
								ctime = TNow,
								timer = Timer,
								pause = Pause,
								last_poll =
								    LastPoll,
								req_list =
								    ReqList,
								ip = IP});
		   C2SPid ->
		       case StreamTo of
			 {To, <<"">>} ->
			     gen_fsm:send_event(C2SPid,
						{xmlstreamstart,
						 <<"stream:stream">>,
						 [{<<"to">>, To},
						  {<<"xmlns">>, ?NS_CLIENT},
						  {<<"xmlns:stream">>,
						   ?NS_STREAM}]});
			 {To, Version} ->
			     gen_fsm:send_event(C2SPid,
						{xmlstreamstart,
						 <<"stream:stream">>,
						 [{<<"to">>, To},
						  {<<"xmlns">>, ?NS_CLIENT},
						  {<<"version">>, Version},
						  {<<"xmlns:stream">>,
						   ?NS_STREAM}]});
			 _ -> ok
		       end,
		       MaxInactivity = get_max_inactivity(StreamTo,
							  StateData#state.max_inactivity),
		       MaxPause = get_max_inactivity(StreamTo,
						     StateData#state.max_pause),
		       ?DEBUG("really sending now: ~p", [Payload]),
		       lists:foreach(fun ({xmlstreamend, End}) ->
					     gen_fsm:send_event(C2SPid,
								{xmlstreamend,
								 End});
					 (El) ->
					     gen_fsm:send_event(C2SPid,
								{xmlstreamelement,
								 El})
				     end,
				     Payload),
		       Reply = ok,
		       process_buffered_request(Reply, StateName,
						StateData#state{input =
								    queue:new(),
								rid = Rid,
								key = SaveKey,
								ctime = TNow,
								timer = Timer,
								pause = Pause,
								last_poll =
								    LastPoll,
								req_list =
								    ReqList,
								max_inactivity =
								    MaxInactivity,
								max_pause =
								    MaxPause,
								ip = IP})
		 end
	   end;
       true ->
	   Reply = {error, bad_key},
	   {reply, Reply, StateName, StateData}
    end.

process_buffered_request(Reply, StateName, StateData) ->
    Rid = StateData#state.rid,
    Requests = StateData#state.unprocessed_req_list,
    case lists:keysearch(Rid + 1, 2, Requests) of
      {value, Request} ->
	  ?DEBUG("Processing buffered request: ~p", [Request]),
	  NewRequests = lists:keydelete(Rid + 1, 2, Requests),
	  handle_http_put_event(Request, StateName,
				StateData#state{unprocessed_req_list =
						    NewRequests});
      _ -> {reply, Reply, StateName, StateData, hibernate}
    end.

handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize,
		StreamStart, IP) ->
    case http_put(Sid, Rid, Attrs, Payload, PayloadSize,
		  StreamStart, IP)
	of
      {error, not_exists} ->
	  ?DEBUG("no session associated with sid: ~p", [Sid]),
	  {404, ?HEADER, <<"">>};
      {{error, Reason}, Sess} ->
	  ?DEBUG("Error on HTTP put. Reason: ~p", [Reason]),
	  handle_http_put_error(Reason, Sess);
      {{repeat, OutPacket}, Sess} ->
	  ?DEBUG("http_put said 'repeat!' ...~nOutPacket: ~p",
		 [OutPacket]),
	  send_outpacket(Sess, OutPacket);
      {{wait, Pause}, _Sess} ->
	  ?DEBUG("Trafic Shaper: Delaying request ~p", [Rid]),
	  timer:sleep(Pause),
	  handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize,
			  StreamStart, IP);
      {ok, Sess} ->
	  prepare_response(Sess, Rid, [], StreamStart)
    end.

http_put(Sid, Rid, Attrs, Payload, PayloadSize,
	 StreamStart, IP) ->
    ?DEBUG("Looking for session: ~p", [Sid]),
    case mnesia:dirty_read({http_bind, Sid}) of
	[] ->
	    {error, not_exists};
	[#http_bind{pid = FsmRef, hold=Hold,
		    to= {To, StreamVersion}} = Sess] ->
	    NewStream = case StreamStart of
		true -> {To, StreamVersion};
		_ -> <<"">>
	    end,
	    {gen_fsm:sync_send_all_state_event(
		    FsmRef, #http_put{rid = Rid,
			    attrs = Attrs,
			    payload = Payload,
			    payload_size = PayloadSize,
			    hold = Hold,
			    stream = NewStream,
			    ip = IP},
			30000), Sess}
    end.

handle_http_put_error(Reason,
		      #http_bind{pid = FsmRef, version = Version})
    when Version >= 0 ->
    gen_fsm:sync_send_all_state_event(FsmRef,
				      {stop, {put_error, Reason}}),
    case Reason of
      not_exists ->
	  {200, ?HEADER,
	   fxml:element_to_binary(#xmlel{name = <<"body">>,
					attrs =
					    [{<<"xmlns">>, ?NS_HTTP_BIND},
					     {<<"type">>, <<"terminate">>},
					     {<<"condition">>,
					      <<"item-not-found">>}],
					children = []})};
      bad_key ->
	  {200, ?HEADER,
	   fxml:element_to_binary(#xmlel{name = <<"body">>,
					attrs =
					    [{<<"xmlns">>, ?NS_HTTP_BIND},
					     {<<"type">>, <<"terminate">>},
					     {<<"condition">>,
					      <<"item-not-found">>}],
					children = []})};
      polling_too_frequently ->
	  {200, ?HEADER,
	   fxml:element_to_binary(#xmlel{name = <<"body">>,
					attrs =
					    [{<<"xmlns">>, ?NS_HTTP_BIND},
					     {<<"type">>, <<"terminate">>},
					     {<<"condition">>,
					      <<"policy-violation">>}],
					children = []})}
    end;
handle_http_put_error(Reason,
		      #http_bind{pid = FsmRef}) ->
    gen_fsm:sync_send_all_state_event(FsmRef,
				      {stop, {put_error_no_version, Reason}}),
    case Reason of
      not_exists -> %% bad rid
	  ?DEBUG("Closing HTTP bind session (Bad rid).", []),
	  {404, ?HEADER, <<"">>};
      bad_key ->
	  ?DEBUG("Closing HTTP bind session (Bad key).", []),
	  {404, ?HEADER, <<"">>};
      polling_too_frequently ->
	  ?DEBUG("Closing HTTP bind session (User polling "
		 "too frequently).",
		 []),
	  {403, ?HEADER, <<"">>}
    end.

%% Control RID ordering
rid_allow(none, _NewRid, _Attrs, _Hold, _MaxPause) ->
    {true, 0};
rid_allow(OldRid, NewRid, Attrs, Hold, MaxPause) ->
    ?DEBUG("Previous rid / New rid: ~p/~p",
	   [OldRid, NewRid]),
    if
      %% We did not miss any packet, we can process it immediately:
      NewRid == OldRid + 1 ->
	  case catch
		 jlib:binary_to_integer(fxml:get_attr_s(<<"pause">>,
							 Attrs))
	      of
	    {'EXIT', _} -> {true, 0};
	    Pause1 when Pause1 =< MaxPause ->
		?DEBUG("got pause: ~p", [Pause1]), {true, Pause1};
	    _ -> {true, 0}
	  end;
      %% We have missed packets, we need to cached it to process it later on:
      (OldRid < NewRid) and (NewRid =< OldRid + Hold + 1) ->
	  buffer;
      (NewRid =< OldRid) and (NewRid > OldRid - Hold - 1) ->
	  repeat;
      true -> false
    end.

update_shaper(ShaperState, PayloadSize) ->
    {NewShaperState, Pause} = shaper:update(ShaperState,
					    PayloadSize),
    if Pause > 0 ->
	   ShaperTimer = erlang:start_timer(Pause, self(),
					    activate),
	   {NewShaperState, ShaperTimer};
       true -> {NewShaperState, undefined}
    end.

prepare_response(Sess, Rid, OutputEls, StreamStart) ->
    receive  after Sess#http_bind.process_delay -> ok end,
    case catch http_get(Sess, Rid) of
      {ok, cancel} ->
	  {200, ?HEADER,
	   <<"<body type='error' xmlns='", (?NS_HTTP_BIND)/binary,
	     "'/>">>};
      {ok, empty} ->
	  {200, ?HEADER,
	   <<"<body xmlns='", (?NS_HTTP_BIND)/binary, "'/>">>};
      {ok, terminate} ->
	  {200, ?HEADER,
	   <<"<body type='terminate' xmlns='",
	     (?NS_HTTP_BIND)/binary, "'/>">>};
      {ok, ROutPacket} ->
	  OutPacket = lists:reverse(ROutPacket),
	  ?DEBUG("OutPacket: ~p", [OutputEls ++ OutPacket]),
	  prepare_outpacket_response(Sess, Rid,
				     OutputEls ++ OutPacket, StreamStart);
      {'EXIT', {shutdown, _}} ->
	  {200, ?HEADER,
	   <<"<body type='terminate' condition='system-shut"
	     "down' xmlns='",
	     (?NS_HTTP_BIND)/binary, "'/>">>};
      {'EXIT', _Reason} ->
	  {200, ?HEADER,
	   <<"<body type='terminate' xmlns='",
	     (?NS_HTTP_BIND)/binary, "'/>">>}
    end.

%% Send output payloads on establised sessions
prepare_outpacket_response(Sess, _Rid, OutPacket,
			   false) ->
    case catch send_outpacket(Sess, OutPacket) of
      {'EXIT', _Reason} ->
	  ?DEBUG("Error in sending packet ~p ", [_Reason]),
	  {200, ?HEADER,
	   <<"<body type='terminate' xmlns='",
	     (?NS_HTTP_BIND)/binary, "'/>">>};
      SendRes -> SendRes
    end;
%% Handle a new session along with its output payload
prepare_outpacket_response(#http_bind{id = Sid,
				      wait = Wait, hold = Hold, to = To} =
			       _Sess,
			   _Rid, OutPacket, true) ->
    case OutPacket of
      [{xmlstreamstart, _, OutAttrs} | Els] ->
	  AuthID = fxml:get_attr_s(<<"id">>, OutAttrs),
	  From = fxml:get_attr_s(<<"from">>, OutAttrs),
	  Version = fxml:get_attr_s(<<"version">>, OutAttrs),
	  OutEls = case Els of
		     [] -> [];
		     [{xmlstreamelement,
		       #xmlel{name = <<"stream:features">>,
			      attrs = StreamAttribs, children = StreamEls}}
		      | StreamTail] ->
			 TypedTail = [check_default_xmlns(OEl)
				      || {xmlstreamelement, OEl} <- StreamTail],
			 [#xmlel{name = <<"stream:features">>,
				 attrs =
				     [{<<"xmlns:stream">>, ?NS_STREAM}] ++
				       StreamAttribs,
				 children = StreamEls}]
			   ++ TypedTail;
		     StreamTail ->
			 [check_default_xmlns(OEl)
			  || {xmlstreamelement, OEl} <- StreamTail]
		   end,
	  case OutEls of
	    [#xmlel{name = <<"stream:error">>}] ->
		{200, ?HEADER,
		 <<"<body type='terminate' condition='host-unknow"
		   "n' xmlns='",
		   (?NS_HTTP_BIND)/binary, "'/>">>};
	    _ ->
		BOSH_attribs = [{<<"authid">>, AuthID},
				{<<"xmlns:xmpp">>, ?NS_BOSH},
				{<<"xmlns:stream">>, ?NS_STREAM}]
				 ++
				 case OutEls of
				   [] -> [];
				   _ -> [{<<"xmpp:version">>, Version}]
				 end,
		MaxInactivity = get_max_inactivity(To, ?MAX_INACTIVITY),
		MaxPause = get_max_pause(To),
		{200, ?HEADER,
		 fxml:element_to_binary(#xmlel{name = <<"body">>,
					      attrs =
						  [{<<"xmlns">>, ?NS_HTTP_BIND},
						   {<<"sid">>, Sid},
						   {<<"wait">>,
						    iolist_to_binary(integer_to_list(Wait))},
						   {<<"requests">>,
						    iolist_to_binary(integer_to_list(Hold
										       +
										       1))},
						   {<<"inactivity">>,
						    iolist_to_binary(integer_to_list(trunc(MaxInactivity
											     /
											     1000)))},
						   {<<"maxpause">>,
						    iolist_to_binary(integer_to_list(MaxPause))},
						   {<<"polling">>,
						    iolist_to_binary(integer_to_list(trunc((?MIN_POLLING)
											     /
											     1000000)))},
						   {<<"ver">>, ?BOSH_VERSION},
						   {<<"from">>, From},
						   {<<"secure">>, <<"true">>}]
						    ++ BOSH_attribs,
					      children = OutEls})}
	  end;
      _ ->
	  {200, ?HEADER,
	   <<"<body type='terminate' condition='internal-se"
	     "rver-error' xmlns='",
	     (?NS_HTTP_BIND)/binary, "'/>">>}
    end.

http_get(#http_bind{pid = FsmRef, wait = Wait,
		    hold = Hold},
	 Rid) ->
    gen_fsm:sync_send_all_state_event(FsmRef,
				      {http_get, Rid, Wait, Hold},
				      2 * (?MAX_WAIT) * 1000).

send_outpacket(#http_bind{pid = FsmRef}, OutPacket) ->
    case OutPacket of
      [] ->
	  {200, ?HEADER,
	   <<"<body xmlns='", (?NS_HTTP_BIND)/binary, "'/>">>};
      [{xmlstreamend, _}] ->
	  gen_fsm:sync_send_all_state_event(FsmRef,
					    {stop, stream_closed}),
	  {200, ?HEADER,
	   <<"<body xmlns='", (?NS_HTTP_BIND)/binary, "'/>">>};
      _ ->
	  AllElements = lists:all(fun ({xmlstreamelement,
					#xmlel{name = <<"stream:error">>}}) ->
					  false;
				      ({xmlstreamelement, _}) -> true;
				      ({xmlstreamraw, _}) -> true;
				      (_) -> false
				  end,
				  OutPacket),
	  case AllElements of
	    true ->
		TypedEls = lists:foldl(fun ({xmlstreamelement, El},
					    Acc) ->
					       Acc ++
						 [fxml:element_to_binary(check_default_xmlns(El))];
					   ({xmlstreamraw, R}, Acc) ->
					       Acc ++ [R]
				       end,
				       [], OutPacket),
		Body = <<"<body xmlns='", (?NS_HTTP_BIND)/binary, "'>",
			 (iolist_to_binary(TypedEls))/binary, "</body>">>,
		?DEBUG(" --- outgoing data --- ~n~s~n --- END "
		       "--- ~n",
		       [Body]),
		{200, ?HEADER, Body};
	    false ->
		case OutPacket of
		  [{xmlstreamstart, _, _} | SEls] ->
		      OutEls = case SEls of
				 [{xmlstreamelement,
				   #xmlel{name = <<"stream:features">>,
					  attrs = StreamAttribs,
					  children = StreamEls}}
				  | StreamTail] ->
				     TypedTail = [check_default_xmlns(OEl)
						  || {xmlstreamelement, OEl}
							 <- StreamTail],
				     [#xmlel{name = <<"stream:features">>,
					     attrs =
						 [{<<"xmlns:stream">>,
						   ?NS_STREAM}]
						   ++ StreamAttribs,
					     children = StreamEls}]
				       ++ TypedTail;
				 StreamTail ->
				     [check_default_xmlns(OEl)
				      || {xmlstreamelement, OEl} <- StreamTail]
			       end,
		      {200, ?HEADER,
		       fxml:element_to_binary(#xmlel{name = <<"body">>,
						    attrs =
							[{<<"xmlns">>,
							  ?NS_HTTP_BIND}],
						    children = OutEls})};
		  _ ->
		      SErrCond = lists:filter(fun ({xmlstreamelement,
						    #xmlel{name =
							       <<"stream:error">>}}) ->
						      true;
						  (_) -> false
					      end,
					      OutPacket),
		      StreamErrCond = case SErrCond of
					[] -> null;
					[{xmlstreamelement,
					  #xmlel{} = StreamErrorTag}
					 | _] ->
					    [StreamErrorTag]
				      end,
		      gen_fsm:sync_send_all_state_event(FsmRef,
							{stop,
							 {stream_error,
							  OutPacket}}),
		      case StreamErrCond of
			null ->
			    {200, ?HEADER,
			     <<"<body type='terminate' condition='internal-se"
			       "rver-error' xmlns='",
			       (?NS_HTTP_BIND)/binary, "'/>">>};
			_ ->
			    {200, ?HEADER,
			     <<"<body type='terminate' condition='remote-stre"
			       "am-error' xmlns='",
			       (?NS_HTTP_BIND)/binary, "' ", "xmlns:stream='",
			       (?NS_STREAM)/binary, "'>",
			       (elements_to_string(StreamErrCond))/binary,
			       "</body>">>}
		      end
		end
	  end
    end.

parse_request(Data, PayloadSize, MaxStanzaSize) ->
    ?DEBUG("--- incoming data --- ~n~s~n --- END "
	   "--- ",
	   [Data]),
    case fxml_stream:parse_element(Data) of
      #xmlel{name = <<"body">>, attrs = Attrs,
	     children = Els} ->
	  Xmlns = fxml:get_attr_s(<<"xmlns">>, Attrs),
	  if Xmlns /= (?NS_HTTP_BIND) -> {error, bad_request};
	     true ->
		 case catch
			jlib:binary_to_integer(fxml:get_attr_s(<<"rid">>,
								Attrs))
		     of
		   {'EXIT', _} -> {error, bad_request};
		   Rid ->
		       FixedEls = lists:filter(fun (I) ->
						       case I of
							 #xmlel{} -> true;
							 _ -> false
						       end
					       end,
					       Els),
		       Sid = fxml:get_attr_s(<<"sid">>, Attrs),
		       if PayloadSize =< MaxStanzaSize ->
			      {ok, {Sid, Rid, Attrs, FixedEls}};
			  true -> {size_limit, Sid}
		       end
		 end
	  end;
      #xmlel{} -> {error, bad_request};
      {error, _Reason} -> {error, bad_request}
    end.

send_receiver_reply(undefined, _Reply) -> ok;
send_receiver_reply(Receiver, Reply) ->
    gen_fsm:reply(Receiver, Reply).

%% Cancel timer and empty message queue.
cancel_timer(undefined) -> ok;
cancel_timer(Timer) ->
    erlang:cancel_timer(Timer),
    receive {timeout, Timer, _} -> ok after 0 -> ok end.

%% If client asked for a pause (pause > 0), we apply the pause value
%% as inactivity timer:
set_inactivity_timer(Pause, _MaxInactivity)
    when Pause > 0 ->
    erlang:start_timer(Pause * 1000, self(), []);
%% Otherwise, we apply the max_inactivity value as inactivity timer:
set_inactivity_timer(_Pause, MaxInactivity) ->
    erlang:start_timer(MaxInactivity, self(), []).

elements_to_string([]) -> [];
elements_to_string([El | Els]) ->
    [fxml:element_to_binary(El) | elements_to_string(Els)].

%% @spec (To, Default::integer()) -> integer()
%% where To = [] | {Host::string(), Version::string()}
get_max_inactivity({Host, _}, Default) ->
    case gen_mod:get_module_opt(Host, mod_http_bind, max_inactivity,
                                fun(I) when is_integer(I), I>0 -> I end,
                                undefined)
	of
      Seconds when is_integer(Seconds) -> Seconds * 1000;
      undefined -> Default
    end;
get_max_inactivity(_, Default) -> Default.

get_max_pause({Host, _}) ->
    gen_mod:get_module_opt(Host, mod_http_bind, max_pause,
                           fun(I) when is_integer(I), I>0 -> I end,
			   ?MAX_PAUSE);
get_max_pause(_) -> ?MAX_PAUSE.

check_default_xmlns(#xmlel{name = Name, attrs = Attrs,
			   children = Els} =
			El) ->
    case fxml:get_tag_attr_s(<<"xmlns">>, El) of
      <<"">> ->
	  #xmlel{name = Name,
		 attrs = [{<<"xmlns">>, ?NS_CLIENT} | Attrs],
		 children = Els};
      _ -> El
    end;
check_default_xmlns(El) -> El.

%% Check that mod_http_bind has been defined in config file.
%% Print a warning in log file if this is not the case.
check_bind_module(XmppDomain) ->
    case gen_mod:is_loaded(XmppDomain, mod_http_bind) of
      true -> true;
      false ->
	  ?ERROR_MSG("You are trying to use BOSH (HTTP Bind) "
		     "in host ~p, but the module mod_http_bind "
		     "is not started in that host. Configure "
		     "your BOSH client to connect to the correct "
		     "host, or add your desired host to the "
		     "configuration, or check your 'modules' "
		     "section in your ejabberd configuration "
		     "file.",
		     [XmppDomain]),
            false
    end.