aboutsummaryrefslogblamecommitdiff
path: root/src/ejabberd_bosh.erl
blob: 386ff986b832509f74f347c0b9112143751d44c1 (plain) (tree)
1
2
3
4
5
6
7
8






                                                                           
                                                  
















                                                                           

                        


                              



                                                      
                                                  
                                                           
                                        






                                                           
                       
                                      
                              



















                                                   





                            












                                                              

                                                                      
                                                                              
                                                                       
                                                              

                                                                             

                                                                   

                                                                       
                                                                                    
                                                                                   

                                                                     
                                                                      










                                                                         
                                                                   












                                                                
                                                  


                                    
                                               





                                              
                                                       










                                                                       
                                             



                                                              
                                                                   










                                                                                  

                                                   

                                                  
                                                                 
 
                                  
                                                  





                                                      




                                      




























































                                                                                         
                                               























                                                                              
                                              


                                              
                                                            

                                                              
                                                                        

                                                                               
                                                         

                                   
                                                                          

                                        
                                                                          
                                                            







                                                                                  





                                                      



                           


                                  
                                                            





                                                  
                                                         








                                                



                                                              
                                                                 






                                                                        
                                                                               
                      
                                                                        








                                                                          















                                                                    

                                         
                                                                 





                                                      
                                                  




                                                     
                                                    


                               
                                                               

                                                     







                                                             
                                          











                                                                         
                                                       





























                                                                           
                                                               





































                                                                             
                                              





























                                                                           
                                           




                                                      
                                                                
                                         
                                               





















                                                          
                                                                   

                                                        
                                                 
                                                                 











                                                            
                                                    




                                                     

                                                           




                                                       
                                                      
                                        
                                                 








                                                                 
                                       
                                                                


                                   



                                            
                                            









                                                 


                                                             
                                                     


                                  




                                   
                                                             





















                                                            
                                                  






















                                                            
                                                       
                                    
                                                       





                                                                    

                                           


                                   
                                                      

                                         
                             









                                                                     
                                   





                                                                      
                                                                                






                                                                     




                                                              
                                           
                     
                                    
                                                      













                                                                    
                                


                                     
                            


                                    
                                                                 


























                                                                       
                                                                 


























































































































                                                                                              


                                    

































                                                                             
                                






















                                                        
                                                       









                                             
                                            




























                                                                  



                             
                                              
                                   

                  
                                            


                                                  
                                                                


                                                  
                             




                                             
                                    
                            















                                                            
                            








                                                         
                                                                 




                                                        

                                               

                                             
%%%-------------------------------------------------------------------
%%% File    : ejabberd_bosh.erl
%%% Author  : Evgeniy Khramtsov <ekhramtsov@process-one.net>
%%% Purpose : Manage BOSH sockets
%%% Created : 20 Jul 2011 by Evgeniy Khramtsov <ekhramtsov@process-one.net>
%%%
%%%
%%% ejabberd, Copyright (C) 2002-2020   ProcessOne
%%%
%%% This program is free software; you can redistribute it and/or
%%% modify it under the terms of the GNU General Public License as
%%% published by the Free Software Foundation; either version 2 of the
%%% License, or (at your option) any later version.
%%%
%%% This program is distributed in the hope that it will be useful,
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
%%% General Public License for more details.
%%%
%%% You should have received a copy of the GNU General Public License along
%%% with this program; if not, write to the Free Software Foundation, Inc.,
%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
%%%
%%%-------------------------------------------------------------------
-module(ejabberd_bosh).
-behaviour(xmpp_socket).
-behaviour(p1_fsm).
-protocol({xep, 124, '1.11'}).
-protocol({xep, 206, '1.4'}).

%% API
-export([start/2, start/3, start_link/3]).

-export([send_xml/2, setopts/2, controlling_process/2,
	 reset_stream/1, change_shaper/2, close/1,
	 sockname/1, peername/1, process_request/3, send/2,
	 get_transport/1, get_owner/1]).

%% gen_fsm callbacks
-export([init/1, wait_for_session/2, wait_for_session/3,
	 active/2, active/3, handle_event/3, print_state/1,
	 handle_sync_event/4, handle_info/3, terminate/3,
	 code_change/4]).

-include("logger.hrl").
-include_lib("xmpp/include/xmpp.hrl").
-include("ejabberd_http.hrl").
-include("bosh.hrl").

%%-define(DBGFSM, true).
-ifdef(DBGFSM).

-define(FSMOPTS, [{debug, [trace]}]).

-else.

-define(FSMOPTS, []).

-endif.

-define(BOSH_VERSION, <<"1.11">>).

-define(NS_BOSH, <<"urn:xmpp:xbosh">>).

-define(NS_HTTP_BIND,
	<<"http://jabber.org/protocol/httpbind">>).

-define(DEFAULT_WAIT, 300).

-define(DEFAULT_HOLD, 1).

-define(DEFAULT_POLLING, 2).

-define(MAX_SHAPED_REQUESTS_QUEUE_LEN, 1000).

-define(SEND_TIMEOUT, 15000).

-type bosh_socket() :: {http_bind, pid(),
                        {inet:ip_address(),
                         inet:port_number()}}.

-export_type([bosh_socket/0]).

-record(state,
	{host = <<"">>                            :: binary(),
         sid = <<"">>                             :: binary(),
         el_ibuf                                  :: p1_queue:queue(),
         el_obuf                                  :: p1_queue:queue(),
         shaper_state = none                      :: ejabberd_shaper:shaper(),
         c2s_pid                                  :: pid() | undefined,
	 xmpp_ver = <<"">>                        :: binary(),
         inactivity_timer                         :: reference() | undefined,
         wait_timer                               :: reference() | undefined,
	 wait_timeout = ?DEFAULT_WAIT             :: pos_integer(),
         inactivity_timeout                       :: pos_integer(),
	 prev_rid = 0                             :: non_neg_integer(),
         prev_key = <<"">>                        :: binary(),
         prev_poll                                :: erlang:timestamp() | undefined,
         max_concat = unlimited                   :: unlimited | non_neg_integer(),
	 responses = gb_trees:empty()             :: gb_trees:tree(),
	 receivers = gb_trees:empty()             :: gb_trees:tree(),
	 shaped_receivers                         :: p1_queue:queue(),
         ip                                       :: inet:ip_address(),
         max_requests = 1                         :: non_neg_integer()}).

-record(body,
	{http_reason = <<"">> :: binary(),
         attrs = []           :: [{any(), any()}],
         els = []             :: [fxml_stream:xml_stream_el()],
         size = 0             :: non_neg_integer()}).

start(#body{attrs = Attrs} = Body, IP, SID) ->
    XMPPDomain = get_attr(to, Attrs),
    SupervisorProc = gen_mod:get_module_proc(XMPPDomain, mod_bosh),
    case catch supervisor:start_child(SupervisorProc,
				      [Body, IP, SID])
	of
      {ok, Pid} -> {ok, Pid};
      {'EXIT', {noproc, _}} ->
	  check_bosh_module(XMPPDomain),
	  {error, module_not_loaded};
      Err ->
	  ?ERROR_MSG("Failed to start BOSH session: ~p", [Err]),
	  {error, Err}
    end.

start(StateName, State) ->
    p1_fsm:start_link(?MODULE, [StateName, State],
			  ?FSMOPTS).

start_link(Body, IP, SID) ->
    p1_fsm:start_link(?MODULE, [Body, IP, SID],
			  ?FSMOPTS).

send({http_bind, FsmRef, IP}, Packet) ->
    send_xml({http_bind, FsmRef, IP}, Packet).

send_xml({http_bind, FsmRef, _IP}, Packet) ->
    case catch p1_fsm:sync_send_all_state_event(FsmRef,
						    {send_xml, Packet},
						    ?SEND_TIMEOUT)
	of
      {'EXIT', {timeout, _}} -> {error, timeout};
      {'EXIT', _} -> {error, einval};
      Res -> Res
    end.

setopts({http_bind, FsmRef, _IP}, Opts) ->
    case lists:member({active, once}, Opts) of
      true ->
	  p1_fsm:send_all_state_event(FsmRef,
					  {activate, self()});
      _ ->
	  case lists:member({active, false}, Opts) of
	    true ->
		case catch p1_fsm:sync_send_all_state_event(FsmRef,
								deactivate_socket)
		    of
		  {'EXIT', _} -> {error, einval};
		  Res -> Res
		end;
	    _ -> ok
	  end
    end.

controlling_process(_Socket, _Pid) -> ok.

reset_stream({http_bind, _FsmRef, _IP} = Socket) ->
    Socket.

change_shaper({http_bind, FsmRef, _IP}, Shaper) ->
    p1_fsm:send_all_state_event(FsmRef, {change_shaper, Shaper}).

close({http_bind, FsmRef, _IP}) ->
    catch p1_fsm:sync_send_all_state_event(FsmRef,
					       close).

sockname(_Socket) -> {ok, {{0, 0, 0, 0}, 0}}.

peername({http_bind, _FsmRef, IP}) -> {ok, IP}.

get_transport(_Socket) ->
    http_bind.

get_owner({http_bind, FsmRef, _IP}) ->
    FsmRef.

process_request(Data, IP, Type) ->
    Opts1 = ejabberd_c2s_config:get_c2s_limits(),
    Opts = case Type of
               xml ->
                   [{xml_socket, true} | Opts1];
               json ->
                   Opts1
           end,
    MaxStanzaSize = case lists:keysearch(max_stanza_size, 1,
					 Opts)
			of
		      {value, {_, Size}} -> Size;
		      _ -> infinity
		    end,
    PayloadSize = iolist_size(Data),
    if PayloadSize > MaxStanzaSize ->
	   http_error(403, <<"Request Too Large">>, Type);
       true ->
	   case decode_body(Data, PayloadSize, Type) of
	     {ok, #body{attrs = Attrs} = Body} ->
		 SID = get_attr(sid, Attrs),
		 To = get_attr(to, Attrs),
		 if SID == <<"">>, To == <<"">> ->
			bosh_response_with_msg(#body{http_reason =
						<<"Missing 'to' attribute">>,
					    attrs =
						[{type, <<"terminate">>},
						 {condition,
						  <<"improper-addressing">>}]},
                                      Type, Body);
		    SID == <<"">> ->
			case start(Body, IP, make_sid()) of
			  {ok, Pid} -> process_request(Pid, Body, IP, Type);
			  _Err ->
			      bosh_response_with_msg(#body{http_reason =
						      <<"Failed to start BOSH session">>,
						  attrs =
						      [{type, <<"terminate">>},
						       {condition,
							<<"internal-server-error">>}]},
                                            Type, Body)
			end;
		    true ->
			case mod_bosh:find_session(SID) of
			  {ok, Pid} -> process_request(Pid, Body, IP, Type);
			  error ->
			      bosh_response_with_msg(#body{http_reason =
						      <<"Session ID mismatch">>,
						  attrs =
						      [{type, <<"terminate">>},
						       {condition,
							<<"item-not-found">>}]},
                                            Type, Body)
			end
		 end;
	     {error, Reason} -> http_error(400, Reason, Type)
	   end
    end.

process_request(Pid, Req, _IP, Type) ->
    case catch p1_fsm:sync_send_event(Pid, Req,
					  infinity)
	of
      #body{} = Resp -> bosh_response(Resp, Type);
      {'EXIT', {Reason, _}}
	  when Reason == noproc; Reason == normal ->
	  bosh_response(#body{http_reason =
				  <<"BOSH session not found">>,
			      attrs =
				  [{type, <<"terminate">>},
				   {condition, <<"item-not-found">>}]},
                        Type);
      {'EXIT', _} ->
	  bosh_response(#body{http_reason =
				  <<"Unexpected error">>,
			      attrs =
				  [{type, <<"terminate">>},
				   {condition, <<"internal-server-error">>}]},
                        Type)
    end.

init([#body{attrs = Attrs}, IP, SID]) ->
    Opts1 = ejabberd_c2s_config:get_c2s_limits(),
    Opts2 = [{xml_socket, true} | Opts1],
    Shaper = none,
    ShaperState = ejabberd_shaper:new(Shaper),
    Socket = make_socket(self(), IP),
    XMPPVer = get_attr('xmpp:version', Attrs),
    XMPPDomain = get_attr(to, Attrs),
    {InBuf, Opts} = case mod_bosh_opt:prebind(XMPPDomain) of
                        true ->
                            JID = make_random_jid(XMPPDomain),
                            {buf_new(XMPPDomain), [{jid, JID} | Opts2]};
                        false ->
                            {buf_in([make_xmlstreamstart(XMPPDomain, XMPPVer)],
                                    buf_new(XMPPDomain)),
                             Opts2}
		    end,
    case ejabberd_c2s:start(?MODULE, Socket, [{receiver, self()}|Opts]) of
	{ok, C2SPid} ->
	    ejabberd_c2s:accept(C2SPid),
	    Inactivity = mod_bosh_opt:max_inactivity(XMPPDomain) div 1000,
	    MaxConcat = mod_bosh_opt:max_concat(XMPPDomain),
	    ShapedReceivers = buf_new(XMPPDomain, ?MAX_SHAPED_REQUESTS_QUEUE_LEN),
	    State = #state{host = XMPPDomain, sid = SID, ip = IP,
			   xmpp_ver = XMPPVer, el_ibuf = InBuf,
			   max_concat = MaxConcat, el_obuf = buf_new(XMPPDomain),
			   inactivity_timeout = Inactivity,
			   shaped_receivers = ShapedReceivers,
			   shaper_state = ShaperState},
	    NewState = restart_inactivity_timer(State),
	    case mod_bosh:open_session(SID, self()) of
		ok ->
		    {ok, wait_for_session, NewState};
		{error, Reason} ->
		    {stop, Reason}
	    end;
	{error, Reason} ->
	    {stop, Reason};
	ignore ->
	    ignore
    end.

wait_for_session(_Event, State) ->
    ?ERROR_MSG("Unexpected event in 'wait_for_session': ~p",
	       [_Event]),
    {next_state, wait_for_session, State}.

wait_for_session(#body{attrs = Attrs} = Req, From,
		 State) ->
    RID = get_attr(rid, Attrs),
    ?DEBUG("Got request:~n** RequestID: ~p~n** Request: "
	   "~p~n** From: ~p~n** State: ~p",
	   [RID, Req, From, State]),
    Wait = min(get_attr(wait, Attrs, undefined),
	       ?DEFAULT_WAIT),
    Hold = min(get_attr(hold, Attrs, undefined),
	       ?DEFAULT_HOLD),
    NewKey = get_attr(newkey, Attrs),
    Type = get_attr(type, Attrs),
    Requests = Hold + 1,
    PollTime = if
		   Wait == 0, Hold == 0 -> erlang:timestamp();
		   true -> undefined
	       end,
    MaxPause = mod_bosh_opt:max_pause(State#state.host) div 1000,
    Resp = #body{attrs =
		     [{sid, State#state.sid}, {wait, Wait},
		      {ver, ?BOSH_VERSION}, {polling, ?DEFAULT_POLLING},
		      {inactivity, State#state.inactivity_timeout},
		      {hold, Hold}, {'xmpp:restartlogic', true},
		      {requests, Requests}, {secure, true},
		      {maxpause, MaxPause}, {'xmlns:xmpp', ?NS_BOSH},
		      {'xmlns:stream', ?NS_STREAM}, {from, State#state.host}]},
    {ShaperState, _} =
	ejabberd_shaper:update(State#state.shaper_state, Req#body.size),
    State1 = State#state{wait_timeout = Wait,
			 prev_rid = RID, prev_key = NewKey,
			 prev_poll = PollTime, shaper_state = ShaperState,
			 max_requests = Requests},
    Els = maybe_add_xmlstreamend(Req#body.els, Type),
    State2 = route_els(State1, Els),
    {State3, RespEls} = get_response_els(State2),
    State4 = stop_inactivity_timer(State3),
    case RespEls of
	[{xmlstreamstart, _, _} = El1] ->
	    OutBuf = buf_in([El1], State4#state.el_obuf),
	    State5 = restart_wait_timer(State4),
	    Receivers = gb_trees:insert(RID, {From, Resp},
					State5#state.receivers),
	    {next_state, active,
	     State5#state{receivers = Receivers, el_obuf = OutBuf}};
	[] ->
	    State5 = restart_wait_timer(State4),
	    Receivers = gb_trees:insert(RID, {From, Resp},
					State5#state.receivers),
	    {next_state, active,
	     State5#state{receivers = Receivers}};
	_ ->
	    reply_next_state(State4, Resp#body{els = RespEls}, RID,
			     From)
    end;
wait_for_session(_Event, _From, State) ->
    ?ERROR_MSG("Unexpected sync event in 'wait_for_session': ~p",
	       [_Event]),
    {reply, {error, badarg}, wait_for_session, State}.

active({#body{} = Body, From}, State) ->
    active1(Body, From, State);
active(_Event, State) ->
    ?ERROR_MSG("Unexpected event in 'active': ~p",
	       [_Event]),
    {next_state, active, State}.

active(#body{attrs = Attrs, size = Size} = Req, From,
       State) ->
    ?DEBUG("Got request:~n** Request: ~p~n** From: "
	   "~p~n** State: ~p",
	   [Req, From, State]),
    {ShaperState, Pause} =
	ejabberd_shaper:update(State#state.shaper_state, Size),
    State1 = State#state{shaper_state = ShaperState},
    if Pause > 0 ->
	    TRef = start_shaper_timer(Pause),
	    try p1_queue:in({TRef, From, Req},
			    State1#state.shaped_receivers) of
		Q ->
		    State2 = stop_inactivity_timer(State1),
		    {next_state, active,
		     State2#state{shaped_receivers = Q}}
	    catch error:full ->
		  misc:cancel_timer(TRef),
		  RID = get_attr(rid, Attrs),
		  reply_stop(State1,
			     #body{http_reason = <<"Too many requests">>,
				   attrs =
				       [{<<"type">>, <<"terminate">>},
					{<<"condition">>,
					 <<"policy-violation">>}]},
			     From, RID)
	   end;
       true -> active1(Req, From, State1)
    end;
active(_Event, _From, State) ->
    ?ERROR_MSG("Unexpected sync event in 'active': ~p",
	       [_Event]),
    {reply, {error, badarg}, active, State}.

active1(#body{attrs = Attrs} = Req, From, State) ->
    RID = get_attr(rid, Attrs),
    Key = get_attr(key, Attrs),
    IsValidKey = is_valid_key(State#state.prev_key, Key),
    IsOveractivity = is_overactivity(State#state.prev_poll),
    Type = get_attr(type, Attrs),
    if RID >
	 State#state.prev_rid + State#state.max_requests ->
	   reply_stop(State,
		      #body{http_reason = <<"Request ID is out of range">>,
			    attrs =
				[{<<"type">>, <<"terminate">>},
				 {<<"condition">>, <<"item-not-found">>}]},
		      From, RID);
       RID > State#state.prev_rid + 1 ->
	   State1 = restart_inactivity_timer(State),
	   Receivers = gb_trees:insert(RID, {From, Req},
				       State1#state.receivers),
	   {next_state, active,
	    State1#state{receivers = Receivers}};
       RID =< State#state.prev_rid ->
            %% TODO: do we need to check 'key' here? It seems so...
            case gb_trees:lookup(RID, State#state.responses) of
                {value, PrevBody} ->
                    {next_state, active,
                     do_reply(State, From, PrevBody, RID)};
                none ->
                    State1 = drop_holding_receiver(State, RID),
                    State2 = stop_inactivity_timer(State1),
                    State3 = restart_wait_timer(State2),
                    Receivers = gb_trees:insert(RID, {From, Req},
                                                State3#state.receivers),
                    {next_state, active, State3#state{receivers = Receivers}}
            end;
       not IsValidKey ->
	   reply_stop(State,
		      #body{http_reason = <<"Session key mismatch">>,
			    attrs =
				[{<<"type">>, <<"terminate">>},
				 {<<"condition">>, <<"item-not-found">>}]},
		      From, RID);
       IsOveractivity ->
	   reply_stop(State,
		      #body{http_reason = <<"Too many requests">>,
			    attrs =
				[{<<"type">>, <<"terminate">>},
				 {<<"condition">>, <<"policy-violation">>}]},
		      From, RID);
       true ->
	   State1 = stop_inactivity_timer(State),
	   State2 = stop_wait_timer(State1),
	   Els = case get_attr('xmpp:restart', Attrs, false) of
		   true ->
		       XMPPDomain = get_attr(to, Attrs, State#state.host),
		       XMPPVer = get_attr('xmpp:version', Attrs,
					  State#state.xmpp_ver),
		       [make_xmlstreamstart(XMPPDomain, XMPPVer)];
		   false -> Req#body.els
		 end,
	   State3 = route_els(State2,
			      maybe_add_xmlstreamend(Els, Type)),
	   {State4, RespEls} = get_response_els(State3),
	   NewKey = get_attr(newkey, Attrs, Key),
	   Pause = get_attr(pause, Attrs, undefined),
	   NewPoll = case State#state.prev_poll of
		       undefined -> undefined;
		       _ -> erlang:timestamp()
		     end,
	   State5 = State4#state{prev_poll = NewPoll,
				 prev_key = NewKey},
	   if Type == <<"terminate">> ->
		  reply_stop(State5,
			     #body{http_reason = <<"Session close">>,
				   attrs = [{<<"type">>, <<"terminate">>}],
				   els = RespEls},
			     From, RID);
	      Pause /= undefined ->
		  State6 = drop_holding_receiver(State5),
		  State7 = restart_inactivity_timer(State6, Pause),
		  InBuf = buf_in(RespEls, State7#state.el_ibuf),
		  {next_state, active,
		   State7#state{prev_rid = RID, el_ibuf = InBuf}};
	      RespEls == [] ->
		  State6 = drop_holding_receiver(State5),
		  State7 = stop_inactivity_timer(State6),
		  State8 = restart_wait_timer(State7),
		  Receivers = gb_trees:insert(RID, {From, #body{}},
					      State8#state.receivers),
		  {next_state, active,
		   State8#state{prev_rid = RID, receivers = Receivers}};
	      true ->
		  State6 = drop_holding_receiver(State5),
		  reply_next_state(State6#state{prev_rid = RID},
				   #body{els = RespEls}, RID, From)
	   end
    end.

handle_event({activate, C2SPid}, StateName,
	     State) ->
    State1 = route_els(State#state{c2s_pid = C2SPid}),
    {next_state, StateName, State1};
handle_event({change_shaper, Shaper}, StateName,
	     State) ->
    {next_state, StateName, State#state{shaper_state = Shaper}};
handle_event(_Event, StateName, State) ->
    ?ERROR_MSG("Unexpected event in '~ts': ~p",
	       [StateName, _Event]),
    {next_state, StateName, State}.

handle_sync_event({send_xml,
		   {xmlstreamstart, _, _} = El},
		  _From, StateName, State)
    when State#state.xmpp_ver >= <<"1.0">> ->
    OutBuf = buf_in([El], State#state.el_obuf),
    {reply, ok, StateName, State#state{el_obuf = OutBuf}};
handle_sync_event({send_xml, El}, _From, StateName,
		  State) ->
    OutBuf = buf_in([El], State#state.el_obuf),
    State1 = State#state{el_obuf = OutBuf},
    case gb_trees:lookup(State1#state.prev_rid,
			 State1#state.receivers)
	of
      {value, {From, Body}} ->
	  {State2, Els} = get_response_els(State1),
	  {reply, ok, StateName,
	   reply(State2, Body#body{els = Els},
		 State2#state.prev_rid, From)};
      none ->
	  State2 = case p1_queue:out(State1#state.shaped_receivers)
		       of
		     {{value, {TRef, From, Body}}, Q} ->
			 misc:cancel_timer(TRef),
			 p1_fsm:send_event(self(), {Body, From}),
			 State1#state{shaped_receivers = Q};
		     _ -> State1
		   end,
	  {reply, ok, StateName, State2}
    end;
handle_sync_event(close, _From, _StateName, State) ->
    {stop, normal, State};
handle_sync_event(deactivate_socket, _From, StateName,
		  StateData) ->
    {reply, ok, StateName,
     StateData#state{c2s_pid = undefined}};
handle_sync_event(_Event, _From, StateName, State) ->
    ?ERROR_MSG("Unexpected sync event in '~ts': ~p",
	       [StateName, _Event]),
    {reply, {error, badarg}, StateName, State}.

handle_info({timeout, TRef, wait_timeout}, StateName,
	    #state{wait_timer = TRef} = State) ->
    State2 = State#state{wait_timer = undefined},
    {next_state, StateName, drop_holding_receiver(State2)};
handle_info({timeout, TRef, inactive}, _StateName,
	    #state{inactivity_timer = TRef} = State) ->
    {stop, normal, State};
handle_info({timeout, TRef, shaper_timeout}, StateName,
	    State) ->
    case p1_queue:out(State#state.shaped_receivers) of
      {{value, {TRef, From, Req}}, Q} ->
	  p1_fsm:send_event(self(), {Req, From}),
	  {next_state, StateName,
	   State#state{shaped_receivers = Q}};
      {{value, _}, _} ->
	  ?ERROR_MSG("shaper_timeout mismatch:~n** TRef: ~p~n** "
		     "State: ~p",
		     [TRef, State]),
	  {stop, normal, State};
      _ -> {next_state, StateName, State}
    end;
handle_info(_Info, StateName, State) ->
    ?ERROR_MSG("Unexpected info:~n** Msg: ~p~n** StateName: ~p",
	       [_Info, StateName]),
    {next_state, StateName, State}.

terminate(_Reason, _StateName, State) ->
    mod_bosh:close_session(State#state.sid),
    case State#state.c2s_pid of
      C2SPid when is_pid(C2SPid) ->
	  p1_fsm:send_event(C2SPid, closed);
      _ -> ok
    end,
    bounce_receivers(State, closed),
    bounce_els_from_obuf(State).

code_change(_OldVsn, StateName, State, _Extra) ->
    {ok, StateName, State}.

print_state(State) -> State.

route_els(#state{el_ibuf = Buf, c2s_pid = C2SPid} = State) ->
    NewBuf = p1_queue:dropwhile(
	       fun(El) ->
		       p1_fsm:send_event(C2SPid, El),
		       true
	       end, Buf),
    State#state{el_ibuf = NewBuf}.

route_els(State, Els) ->
    case State#state.c2s_pid of
      C2SPid when is_pid(C2SPid) ->
	  lists:foreach(fun (El) ->
				p1_fsm:send_event(C2SPid, El)
			end,
			Els),
	  State;
      _ ->
	  InBuf = buf_in(Els, State#state.el_ibuf),
	  State#state{el_ibuf = InBuf}
    end.

get_response_els(#state{el_obuf = OutBuf,
			max_concat = MaxConcat} =
		     State) ->
    {Els, NewOutBuf} = buf_out(OutBuf, MaxConcat),
    {State#state{el_obuf = NewOutBuf}, Els}.

reply(State, Body, RID, From) ->
    State1 = restart_inactivity_timer(State),
    Receivers = gb_trees:delete_any(RID,
				    State1#state.receivers),
    State2 = do_reply(State1, From, Body, RID),
    case catch gb_trees:take_smallest(Receivers) of
      {NextRID, {From1, Req}, Receivers1}
	  when NextRID == RID + 1 ->
	  p1_fsm:send_event(self(), {Req, From1}),
	  State2#state{receivers = Receivers1};
      _ -> State2#state{receivers = Receivers}
    end.

reply_next_state(State, Body, RID, From) ->
    State1 = restart_inactivity_timer(State),
    Receivers = gb_trees:delete_any(RID,
				    State1#state.receivers),
    State2 = do_reply(State1, From, Body, RID),
    case catch gb_trees:take_smallest(Receivers) of
      {NextRID, {From1, Req}, Receivers1}
	  when NextRID == RID + 1 ->
	  active(Req, From1,
		 State2#state{receivers = Receivers1});
      _ ->
	  {next_state, active,
	   State2#state{receivers = Receivers}}
    end.

reply_stop(State, Body, From, RID) ->
    {stop, normal, do_reply(State, From, Body, RID)}.

drop_holding_receiver(State) ->
    drop_holding_receiver(State, State#state.prev_rid).
drop_holding_receiver(State, RID) ->
    case gb_trees:lookup(RID, State#state.receivers) of
	{value, {From, Body}} ->
	    State1 = restart_inactivity_timer(State),
	    Receivers = gb_trees:delete_any(RID,
					    State1#state.receivers),
	    State2 = State1#state{receivers = Receivers},
	    do_reply(State2, From, Body, RID);
	none ->
	    restart_inactivity_timer(State)
    end.

do_reply(State, From, Body, RID) ->
    ?DEBUG("Send reply:~n** RequestID: ~p~n** Reply: "
	   "~p~n** To: ~p~n** State: ~p",
	   [RID, Body, From, State]),
    p1_fsm:reply(From, Body),
    Responses = gb_trees:delete_any(RID,
				    State#state.responses),
    Responses1 = case gb_trees:size(Responses) of
		   N when N < State#state.max_requests; N == 0 ->
		       Responses;
		   _ -> element(3, gb_trees:take_smallest(Responses))
		 end,
    Responses2 = gb_trees:insert(RID, Body, Responses1),
    State#state{responses = Responses2}.

bounce_receivers(State, _Reason) ->
    Receivers = gb_trees:to_list(State#state.receivers),
    ShapedReceivers = lists:map(fun ({_, From,
				      #body{attrs = Attrs} = Body}) ->
					RID = get_attr(rid, Attrs),
					{RID, {From, Body}}
				end,
				p1_queue:to_list(State#state.shaped_receivers)),
    lists:foldl(fun ({RID, {From, _Body}}, AccState) ->
			NewBody = #body{http_reason =
					    <<"Session closed">>,
					attrs =
					    [{type, <<"terminate">>},
					     {condition,
					      <<"other-request">>}]},
			do_reply(AccState, From, NewBody, RID)
		end,
		State, Receivers ++ ShapedReceivers).

bounce_els_from_obuf(State) ->
    Opts = ejabberd_config:codec_options(),
    p1_queue:foreach(
      fun({xmlstreamelement, El}) ->
	      try xmpp:decode(El, ?NS_CLIENT, Opts) of
		  Pkt when ?is_stanza(Pkt) ->
		      case {xmpp:get_from(Pkt), xmpp:get_to(Pkt)} of
			  {#jid{}, #jid{}} ->
			      ejabberd_router:route(Pkt);
			  _ ->
			      ok
		      end;
		  _ ->
		      ok
	      catch _:{xmpp_codec, _} ->
		      ok
	      end;
	 (_) ->
	      ok
      end, State#state.el_obuf).

is_valid_key(<<"">>, <<"">>) -> true;
is_valid_key(PrevKey, Key) ->
    str:sha(Key) == PrevKey.

is_overactivity(undefined) -> false;
is_overactivity(PrevPoll) ->
    PollPeriod = timer:now_diff(erlang:timestamp(), PrevPoll) div
		   1000000,
    if PollPeriod < (?DEFAULT_POLLING) -> true;
       true -> false
    end.

make_xmlstreamstart(XMPPDomain, Version) ->
    VersionEl = case Version of
		  <<"">> -> [];
		  _ -> [{<<"version">>, Version}]
		end,
    {xmlstreamstart, <<"stream:stream">>,
     [{<<"to">>, XMPPDomain}, {<<"xmlns">>, ?NS_CLIENT},
      {<<"xmlns:xmpp">>, ?NS_BOSH},
      {<<"xmlns:stream">>, ?NS_STREAM}
      | VersionEl]}.

maybe_add_xmlstreamend(Els, <<"terminate">>) ->
    Els ++ [{xmlstreamend, <<"stream:stream">>}];
maybe_add_xmlstreamend(Els, _) -> Els.

encode_body(#body{attrs = Attrs, els = Els}, Type) ->
    Attrs1 = lists:map(fun ({K, V}) when is_atom(K) ->
			       AmK = iolist_to_binary(atom_to_list(K)),
			       case V of
				 true -> {AmK, <<"true">>};
				 false -> {AmK, <<"false">>};
				 I when is_integer(I), I >= 0 ->
				     {AmK, integer_to_binary(I)};
				 _ -> {AmK, V}
			       end;
			   ({K, V}) -> {K, V}
		       end,
		       Attrs),
    Attrs2 = [{<<"xmlns">>, ?NS_HTTP_BIND} | Attrs1],
    {Attrs3, XMLs} = lists:foldr(fun ({xmlstreamraw, XML},
				      {AttrsAcc, XMLBuf}) ->
					 {AttrsAcc, [XML | XMLBuf]};
				     ({xmlstreamelement,
				       #xmlel{name = <<"stream:error">>} = El},
				      {AttrsAcc, XMLBuf}) ->
					 {[{<<"type">>, <<"terminate">>},
					   {<<"condition">>,
					    <<"remote-stream-error">>},
					   {<<"xmlns:stream">>, ?NS_STREAM}
					   | AttrsAcc],
					  [encode_element(El, Type) | XMLBuf]};
				     ({xmlstreamelement,
				       #xmlel{name = <<"stream:features">>} =
					   El},
				      {AttrsAcc, XMLBuf}) ->
					 {lists:keystore(<<"xmlns:stream">>, 1,
							 AttrsAcc,
							 {<<"xmlns:stream">>,
							  ?NS_STREAM}),
					  [encode_element(El, Type) | XMLBuf]};
                                     ({xmlstreamelement,
                                       #xmlel{name = Name, attrs = EAttrs} = El},
                                      {AttrsAcc, XMLBuf})
                                       when Name == <<"message">>;
                                            Name == <<"presence">>;
                                            Name == <<"iq">> ->
                                         NewAttrs = lists:keystore(
                                                      <<"xmlns">>, 1, EAttrs,
                                                      {<<"xmlns">>, ?NS_CLIENT}),
                                         NewEl = El#xmlel{attrs = NewAttrs},
                                         {AttrsAcc,
                                          [encode_element(NewEl, Type) | XMLBuf]};
				     ({xmlstreamelement, El},
				      {AttrsAcc, XMLBuf}) ->
                                         {AttrsAcc,
                                          [encode_element(El, Type) | XMLBuf]};
				     ({xmlstreamend, _}, {AttrsAcc, XMLBuf}) ->
					 {[{<<"type">>, <<"terminate">>},
					   {<<"condition">>,
					    <<"remote-stream-error">>}
					   | AttrsAcc],
					  XMLBuf};
				     ({xmlstreamstart, <<"stream:stream">>,
				       SAttrs},
				      {AttrsAcc, XMLBuf}) ->
					 StreamID = fxml:get_attr_s(<<"id">>,
								   SAttrs),
					 NewAttrs = case
						      fxml:get_attr_s(<<"version">>,
								     SAttrs)
							of
						      <<"">> ->
							  [{<<"authid">>,
							    StreamID}
							   | AttrsAcc];
						      V ->
							  lists:keystore(<<"xmlns:xmpp">>,
									 1,
									 [{<<"xmpp:version">>,
									   V},
									  {<<"authid">>,
									   StreamID}
									  | AttrsAcc],
									 {<<"xmlns:xmpp">>,
									  ?NS_BOSH})
						    end,
					 {NewAttrs, XMLBuf};
				     ({xmlstreamerror, _},
				      {AttrsAcc, XMLBuf}) ->
					 {[{<<"type">>, <<"terminate">>},
					   {<<"condition">>,
					    <<"remote-stream-error">>}
					   | AttrsAcc],
					  XMLBuf};
				     (_, Acc) -> Acc
				 end,
				 {Attrs2, []}, Els),
    case XMLs of
      [] when Type == xml ->
            [<<"<body">>, attrs_to_list(Attrs3), <<"/>">>];
      _ when Type == xml ->
            [<<"<body">>, attrs_to_list(Attrs3), $>, XMLs,
	     <<"</body>">>]
    end.

encode_element(El, xml) ->
    fxml:element_to_binary(El);
encode_element(El, json) ->
    El.

decode_body(Data, Size, Type) ->
    case decode(Data, Type) of
      #xmlel{name = <<"body">>, attrs = Attrs,
	     children = Els} ->
	  case attrs_to_body_attrs(Attrs) of
	    {error, _} = Err -> Err;
	    BodyAttrs ->
		case get_attr(rid, BodyAttrs) of
		  <<"">> -> {error, <<"Missing \"rid\" attribute">>};
		  _ ->
		      Els1 = lists:flatmap(fun (#xmlel{} = El) ->
						   [{xmlstreamelement, El}];
					       (_) -> []
					   end,
					   Els),
		      {ok, #body{attrs = BodyAttrs, size = Size, els = Els1}}
		end
	  end;
      #xmlel{} -> {error, <<"Unexpected payload">>};
      _ when Type == xml ->
            {error, <<"XML is not well-formed">>};
      _ when Type == json ->
            {error, <<"JSON is not well-formed">>}
    end.

decode(Data, xml) ->
    fxml_stream:parse_element(Data);
decode(Data, json) ->
    Data.

attrs_to_body_attrs(Attrs) ->
    lists:foldl(fun (_, {error, Reason}) -> {error, Reason};
		    ({Attr, Val}, Acc) ->
			try case Attr of
			      <<"ver">> -> [{ver, Val} | Acc];
			      <<"xmpp:version">> ->
				  [{'xmpp:version', Val} | Acc];
			      <<"type">> -> [{type, Val} | Acc];
			      <<"key">> -> [{key, Val} | Acc];
			      <<"newkey">> -> [{newkey, Val} | Acc];
			      <<"xmlns">> -> Val = (?NS_HTTP_BIND), Acc;
			      <<"secure">> -> [{secure, to_bool(Val)} | Acc];
			      <<"xmpp:restart">> ->
				  [{'xmpp:restart', to_bool(Val)} | Acc];
			      <<"to">> ->
				  [{to, jid:nameprep(Val)} | Acc];
			      <<"wait">> -> [{wait, to_int(Val, 0)} | Acc];
			      <<"ack">> -> [{ack, to_int(Val, 0)} | Acc];
			      <<"sid">> -> [{sid, Val} | Acc];
			      <<"hold">> -> [{hold, to_int(Val, 0)} | Acc];
			      <<"rid">> -> [{rid, to_int(Val, 0)} | Acc];
			      <<"pause">> -> [{pause, to_int(Val, 0)} | Acc];
			      _ -> [{Attr, Val} | Acc]
			    end
			catch
			  _:_ ->
			      {error,
			       <<"Invalid \"", Attr/binary, "\" attribute">>}
			end
		end,
		[], Attrs).

to_int(S, Min) ->
    case binary_to_integer(S) of
      I when I >= Min -> I;
      _ -> erlang:error(badarg)
    end.

to_bool(<<"true">>) -> true;
to_bool(<<"1">>) -> true;
to_bool(<<"false">>) -> false;
to_bool(<<"0">>) -> false.

attrs_to_list(Attrs) -> [attr_to_list(A) || A <- Attrs].

attr_to_list({Name, Value}) ->
    [$\s, Name, $=, $', fxml:crypt(Value), $'].

bosh_response(Body, Type) ->
    CType = case Type of
                xml -> ?CT_XML;
                json -> ?CT_JSON
            end,
    {200, Body#body.http_reason, ?HEADER(CType),
     encode_body(Body, Type)}.

bosh_response_with_msg(Body, Type, RcvBody) ->
    ?DEBUG("Send error reply:~p~n** Receiced body: ~p",
	   [Body, RcvBody]),
    bosh_response(Body, Type).

http_error(Status, Reason, Type) ->
    CType = case Type of
                xml -> ?CT_XML;
                json -> ?CT_JSON
            end,
    {Status, Reason, ?HEADER(CType), <<"">>}.

make_sid() -> str:sha(p1_rand:get_string()).

-compile({no_auto_import, [{min, 2}]}).

min(undefined, B) -> B;
min(A, B) -> erlang:min(A, B).

check_bosh_module(XmppDomain) ->
    case gen_mod:is_loaded(XmppDomain, mod_bosh) of
      true -> ok;
      false ->
	  ?ERROR_MSG("You are trying to use BOSH (HTTP Bind) "
		     "in host ~p, but the module mod_bosh "
		     "is not started in that host. Configure "
		     "your BOSH client to connect to the correct "
		     "host, or add your desired host to the "
		     "configuration, or check your 'modules' "
		     "section in your ejabberd configuration "
		     "file.",
		     [XmppDomain])
    end.

get_attr(Attr, Attrs) -> get_attr(Attr, Attrs, <<"">>).

get_attr(Attr, Attrs, Default) ->
    case lists:keysearch(Attr, 1, Attrs) of
      {value, {_, Val}} -> Val;
      _ -> Default
    end.

buf_new(Host) ->
    buf_new(Host, unlimited).

buf_new(Host, Limit) ->
    QueueType = mod_bosh_opt:queue_type(Host),
    p1_queue:new(QueueType, Limit).

buf_in(Xs, Buf) ->
    lists:foldl(fun p1_queue:in/2, Buf, Xs).

buf_out(Buf, Num) when is_integer(Num), Num > 0 ->
    buf_out(Buf, Num, []);
buf_out(Buf, _) -> {p1_queue:to_list(Buf), p1_queue:clear(Buf)}.

buf_out(Buf, 0, Els) -> {lists:reverse(Els), Buf};
buf_out(Buf, I, Els) ->
    case p1_queue:out(Buf) of
      {{value, El}, NewBuf} ->
	  buf_out(NewBuf, I - 1, [El | Els]);
      {empty, _} -> buf_out(Buf, 0, Els)
    end.

restart_timer(TRef, Timeout, Msg) ->
    misc:cancel_timer(TRef),
    erlang:start_timer(timer:seconds(Timeout), self(), Msg).

restart_inactivity_timer(#state{inactivity_timeout =
				    Timeout} =
			     State) ->
    restart_inactivity_timer(State, Timeout).

restart_inactivity_timer(#state{inactivity_timer =
				    TRef} =
			     State,
			 Timeout) ->
    NewTRef = restart_timer(TRef, Timeout, inactive),
    State#state{inactivity_timer = NewTRef}.

stop_inactivity_timer(#state{inactivity_timer = TRef} =
			  State) ->
    misc:cancel_timer(TRef),
    State#state{inactivity_timer = undefined}.

restart_wait_timer(#state{wait_timer = TRef,
			  wait_timeout = Timeout} =
		       State) ->
    NewTRef = restart_timer(TRef, Timeout, wait_timeout),
    State#state{wait_timer = NewTRef}.

stop_wait_timer(#state{wait_timer = TRef} = State) ->
    misc:cancel_timer(TRef), State#state{wait_timer = undefined}.

start_shaper_timer(Timeout) ->
    erlang:start_timer(Timeout, self(), shaper_timeout).

make_random_jid(Host) ->
    User = p1_rand:get_string(),
    jid:make(User, Host, p1_rand:get_string()).

make_socket(Pid, IP) -> {http_bind, Pid, IP}.