diff options
Diffstat (limited to 'src/web/ejabberd_http_bind.erl')
-rw-r--r-- | src/web/ejabberd_http_bind.erl | 1559 |
1 files changed, 770 insertions, 789 deletions
diff --git a/src/web/ejabberd_http_bind.erl b/src/web/ejabberd_http_bind.erl index 02b8d27b..91329167 100644 --- a/src/web/ejabberd_http_bind.erl +++ b/src/web/ejabberd_http_bind.erl @@ -41,11 +41,15 @@ process_request/2]). -include("ejabberd.hrl"). + -include("jlib.hrl"). + -include("ejabberd_http.hrl"). + -include("http_bind.hrl"). --record(http_bind, {id, pid, to, hold, wait, process_delay, version}). +-record(http_bind, + {id, pid, to, hold, wait, process_delay, version}). -define(NULL_PEER, {{0, 0, 0, 0}, 0}). @@ -89,28 +93,39 @@ %%-define(DBGFSM, true). -ifdef(DBGFSM). + -define(FSMOPTS, [{debug, [trace]}]). + -else. + -define(FSMOPTS, []). + -endif. --define(BOSH_VERSION, "1.8"). --define(NS_CLIENT, "jabber:client"). --define(NS_BOSH, "urn:xmpp:xbosh"). --define(NS_HTTP_BIND, "http://jabber.org/protocol/httpbind"). +%% Wait 100ms before continue processing, to allow the client provide more related stanzas. +-define(BOSH_VERSION, <<"1.8">>). --define(MAX_REQUESTS, 2). % number of simultaneous requests --define(MIN_POLLING, 2000000). % don't poll faster than that or we will - % shoot you (time in microsec) --define(MAX_WAIT, 3600). % max num of secs to keep a request on hold --define(MAX_INACTIVITY, 30000). % msecs to wait before terminating - % idle sessions --define(MAX_PAUSE, 120). % may num of sec a client is allowed to pause - % the session +-define(NS_CLIENT, <<"jabber:client">>). + +-define(NS_BOSH, <<"urn:xmpp:xbosh">>). + +-define(NS_HTTP_BIND, + <<"http://jabber.org/protocol/httpbind">>). + +-define(MAX_REQUESTS, 2). + +-define(MIN_POLLING, 2000000). + +-define(MAX_WAIT, 3600). + +-define(MAX_INACTIVITY, 30000). + +-define(MAX_PAUSE, 120). -%% Wait 100ms before continue processing, to allow the client provide more related stanzas. -define(PROCESS_DELAY_DEFAULT, 100). + -define(PROCESS_DELAY_MIN, 0). + -define(PROCESS_DELAY_MAX, 1000). %% Line copied from mod_http_bind.erl @@ -134,10 +149,12 @@ start_link(Sid, Key, IP) -> gen_fsm:start_link(?MODULE, [Sid, Key, IP], ?FSMOPTS). send({http_bind, FsmRef, _IP}, Packet) -> - gen_fsm:sync_send_all_state_event(FsmRef, {send, Packet}). + gen_fsm:sync_send_all_state_event(FsmRef, + {send, Packet}). send_xml({http_bind, FsmRef, _IP}, Packet) -> - gen_fsm:sync_send_all_state_event(FsmRef, {send_xml, Packet}). + gen_fsm:sync_send_all_state_event(FsmRef, + {send_xml, Packet}). setopts({http_bind, FsmRef, _IP}, Opts) -> case lists:member({active, once}, Opts) of @@ -147,142 +164,143 @@ setopts({http_bind, FsmRef, _IP}, Opts) -> ok end. -controlling_process(_Socket, _Pid) -> - ok. +controlling_process(_Socket, _Pid) -> ok. custom_receiver({http_bind, FsmRef, _IP}) -> {receiver, ?MODULE, FsmRef}. become_controller(FsmRef, C2SPid) -> - gen_fsm:send_all_state_event(FsmRef, {become_controller, C2SPid}). + gen_fsm:send_all_state_event(FsmRef, + {become_controller, C2SPid}). reset_stream({http_bind, _FsmRef, _IP}) -> ok. change_shaper({http_bind, FsmRef, _IP}, Shaper) -> - gen_fsm:send_all_state_event(FsmRef, {change_shaper, Shaper}). + gen_fsm:send_all_state_event(FsmRef, + {change_shaper, Shaper}). monitor({http_bind, FsmRef, _IP}) -> erlang:monitor(process, FsmRef). close({http_bind, FsmRef, _IP}) -> - catch gen_fsm:sync_send_all_state_event(FsmRef, {stop, close}). + catch gen_fsm:sync_send_all_state_event(FsmRef, + {stop, close}). -sockname(_Socket) -> - {ok, ?NULL_PEER}. +sockname(_Socket) -> {ok, ?NULL_PEER}. -peername({http_bind, _FsmRef, IP}) -> - {ok, IP}. +peername({http_bind, _FsmRef, IP}) -> {ok, IP}. %% Entry point for data coming from client through ejabberd HTTP server: process_request(Data, IP) -> Opts1 = ejabberd_c2s_config:get_c2s_limits(), Opts = [{xml_socket, true} | Opts1], - MaxStanzaSize = - case lists:keysearch(max_stanza_size, 1, Opts) of - {value, {_, Size}} -> Size; - _ -> infinity - end, + MaxStanzaSize = case lists:keysearch(max_stanza_size, 1, + Opts) + of + {value, {_, Size}} -> Size; + _ -> infinity + end, PayloadSize = iolist_size(Data), - case catch parse_request(Data, PayloadSize, MaxStanzaSize) of - %% No existing session: - {ok, {"", Rid, Attrs, Payload}} -> - case xml:get_attr_s("to",Attrs) of - "" -> - ?DEBUG("Session not created (Improper addressing)", []), - {200, ?HEADER, "<body type='terminate' " - "condition='improper-addressing' " - "xmlns='" ++ ?NS_HTTP_BIND ++ "'/>"}; - XmppDomain -> - %% create new session - Sid = sha:sha(term_to_binary({now(), make_ref()})), - case start(XmppDomain, Sid, "", IP) of - {error, _} -> - {200, ?HEADER, "<body type='terminate' " - "condition='internal-server-error' " - "xmlns='" ++ ?NS_HTTP_BIND ++ "'>BOSH module not started</body>"}; - {ok, Pid} -> - handle_session_start( - Pid, XmppDomain, Sid, Rid, Attrs, - Payload, PayloadSize, IP) - end - end; - %% Existing session - {ok, {Sid, Rid, Attrs, Payload1}} -> - StreamStart = - case xml:get_attr_s("xmpp:restart",Attrs) of - "true" -> - true; - _ -> - false - end, - Payload2 = case xml:get_attr_s("type",Attrs) of - "terminate" -> - %% close stream - Payload1 ++ [{xmlstreamend, "stream:stream"}]; - _ -> - Payload1 - end, - handle_http_put(Sid, Rid, Attrs, Payload2, PayloadSize, - StreamStart, IP); - {size_limit, Sid} -> - case mnesia:dirty_read({http_bind, Sid}) of - [] -> - {404, ?HEADER, ""}; - [#http_bind{pid = FsmRef}] -> - gen_fsm:sync_send_all_state_event(FsmRef, {stop, close}), - {200, ?HEADER, "<body type='terminate' " - "condition='undefined-condition' " - "xmlns='" ++ ?NS_HTTP_BIND ++ "'>Request Too Large</body>"} - end; - _ -> - ?DEBUG("Received bad request: ~p", [Data]), - {400, ?HEADER, ""} + case catch parse_request(Data, PayloadSize, + MaxStanzaSize) + of + %% No existing session: + {ok, {<<"">>, Rid, Attrs, Payload}} -> + case xml:get_attr_s(<<"to">>, Attrs) of + <<"">> -> + ?DEBUG("Session not created (Improper addressing)", []), + {200, ?HEADER, + <<"<body type='terminate' condition='improper-ad" + "dressing' xmlns='", + (?NS_HTTP_BIND)/binary, "'/>">>}; + XmppDomain -> + Sid = sha:sha(term_to_binary({now(), make_ref()})), + case start(XmppDomain, Sid, <<"">>, IP) of + {error, _} -> + {500, ?HEADER, + <<"<body type='terminate' condition='internal-se" + "rver-error' xmlns='", + (?NS_HTTP_BIND)/binary, + "'>Internal Server Error</body>">>}; + {ok, Pid} -> + handle_session_start(Pid, XmppDomain, Sid, Rid, Attrs, + Payload, PayloadSize, IP) + end + end; + %% Existing session + {ok, {Sid, Rid, Attrs, Payload1}} -> + StreamStart = case xml:get_attr_s(<<"xmpp:restart">>, + Attrs) + of + <<"true">> -> true; + _ -> false + end, + Payload2 = case xml:get_attr_s(<<"type">>, Attrs) of + <<"terminate">> -> + Payload1 ++ [{xmlstreamend, <<"stream:stream">>}]; + _ -> Payload1 + end, + handle_http_put(Sid, Rid, Attrs, Payload2, PayloadSize, + StreamStart, IP); + {size_limit, Sid} -> + case mnesia:dirty_read({http_bind, Sid}) of + {error, _} -> {404, ?HEADER, <<"">>}; + {ok, #http_bind{pid = FsmRef}} -> + gen_fsm:sync_send_all_state_event(FsmRef, + {stop, close}), + {200, ?HEADER, + <<"<body type='terminate' condition='undefined-c" + "ondition' xmlns='", + (?NS_HTTP_BIND)/binary, "'>Request Too Large</body>">>} + end; + _ -> + ?DEBUG("Received bad request: ~p", [Data]), + {400, ?HEADER, <<"">>} end. handle_session_start(Pid, XmppDomain, Sid, Rid, Attrs, Payload, PayloadSize, IP) -> ?DEBUG("got pid: ~p", [Pid]), - Wait = case string:to_integer(xml:get_attr_s("wait",Attrs)) of - {error, _} -> - ?MAX_WAIT; - {CWait, _} -> - if - (CWait > ?MAX_WAIT) -> - ?MAX_WAIT; - true -> - CWait - end + Wait = case str:to_integer(xml:get_attr_s(<<"wait">>, + Attrs)) + of + {error, _} -> ?MAX_WAIT; + {CWait, _} -> + if CWait > (?MAX_WAIT) -> ?MAX_WAIT; + true -> CWait + end end, - Hold = case string:to_integer(xml:get_attr_s("hold",Attrs)) of - {error, _} -> - (?MAX_REQUESTS - 1); - {CHold, _} -> - if - (CHold > (?MAX_REQUESTS - 1)) -> - (?MAX_REQUESTS - 1); - true -> - CHold - end + Hold = case str:to_integer(xml:get_attr_s(<<"hold">>, + Attrs)) + of + {error, _} -> (?MAX_REQUESTS) - 1; + {CHold, _} -> + if CHold > (?MAX_REQUESTS) - 1 -> (?MAX_REQUESTS) - 1; + true -> CHold + end end, - Pdelay = case string:to_integer(xml:get_attr_s("process-delay",Attrs)) of - {error, _} -> - ?PROCESS_DELAY_DEFAULT; - {CPdelay, _} when - (?PROCESS_DELAY_MIN =< CPdelay) and - (CPdelay =< ?PROCESS_DELAY_MAX) -> - CPdelay; - {CPdelay, _} -> - lists:max([lists:min([CPdelay, ?PROCESS_DELAY_MAX]), ?PROCESS_DELAY_MIN]) + Pdelay = case + str:to_integer(xml:get_attr_s(<<"process-delay">>, + Attrs)) + of + {error, _} -> ?PROCESS_DELAY_DEFAULT; + {CPdelay, _} + when ((?PROCESS_DELAY_MIN) =< CPdelay) and + (CPdelay =< (?PROCESS_DELAY_MAX)) -> + CPdelay; + {CPdelay, _} -> + lists:max([lists:min([CPdelay, ?PROCESS_DELAY_MAX]), + ?PROCESS_DELAY_MIN]) end, - Version = - case catch list_to_float( - xml:get_attr_s("ver", Attrs)) of - {'EXIT', _} -> 0.0; - V -> V - end, - XmppVersion = xml:get_attr_s("xmpp:version", Attrs), + Version = case catch + list_to_float(binary_to_list(xml:get_attr_s(<<"ver">>, Attrs))) + of + {'EXIT', _} -> 0.0; + V -> V + end, + XmppVersion = xml:get_attr_s(<<"xmpp:version">>, Attrs), ?DEBUG("Create session: ~p", [Sid]), mnesia:dirty_write( #http_bind{id = Sid, @@ -309,16 +327,8 @@ handle_session_start(Pid, XmppDomain, Sid, Rid, Attrs, %%---------------------------------------------------------------------- init([Sid, Key, IP]) -> ?DEBUG("started: ~p", [{Sid, Key, IP}]), - - %% Read c2s options from the first ejabberd_c2s configuration in - %% the config file listen section - %% TODO: We should have different access and shaper values for - %% each connector. The default behaviour should be however to use - %% the default c2s restrictions if not defined for the current - %% connector. Opts1 = ejabberd_c2s_config:get_c2s_limits(), Opts = [{xml_socket, true} | Opts1], - Shaper = none, ShaperState = shaper:new(Shaper), Socket = {http_bind, self(), IP}, @@ -340,22 +350,21 @@ init([Sid, Key, IP]) -> %%---------------------------------------------------------------------- handle_event({become_controller, C2SPid}, StateName, StateData) -> case StateData#state.input of - cancel -> - {next_state, StateName, StateData#state{ - waiting_input = C2SPid}}; - Input -> - lists:foreach( - fun(Event) -> - C2SPid ! Event - end, queue:to_list(Input)), - {next_state, StateName, StateData#state{ - input = queue:new(), - waiting_input = C2SPid}} + cancel -> + {next_state, StateName, + StateData#state{waiting_input = C2SPid}}; + Input -> + lists:foreach(fun (Event) -> C2SPid ! Event end, + queue:to_list(Input)), + {next_state, StateName, + StateData#state{input = queue:new(), + waiting_input = C2SPid}} end; - -handle_event({change_shaper, Shaper}, StateName, StateData) -> +handle_event({change_shaper, Shaper}, StateName, + StateData) -> NewShaperState = shaper:new(Shaper), - {next_state, StateName, StateData#state{shaper_state = NewShaperState}}; + {next_state, StateName, + StateData#state{shaper_state = NewShaperState}}; handle_event(_Event, StateName, StateData) -> {next_state, StateName, StateData}. @@ -372,13 +381,16 @@ handle_sync_event({send_xml, Packet}, _From, StateName, #state{http_receiver = undefined} = StateData) -> Output = [Packet | StateData#state.output], Reply = ok, - {reply, Reply, StateName, StateData#state{output = Output}}; + {reply, Reply, StateName, + StateData#state{output = Output}}; handle_sync_event({send_xml, Packet}, _From, StateName, #state{out_of_order_receiver = true} = StateData) -> Output = [Packet | StateData#state.output], Reply = ok, - {reply, Reply, StateName, StateData#state{output = Output}}; -handle_sync_event({send_xml, Packet}, _From, StateName, StateData) -> + {reply, Reply, StateName, + StateData#state{output = Output}}; +handle_sync_event({send_xml, Packet}, _From, StateName, + StateData) -> Output = [Packet | StateData#state.output], cancel_timer(StateData#state.timer), Timer = set_inactivity_timer(StateData#state.pause, @@ -387,19 +399,14 @@ handle_sync_event({send_xml, Packet}, _From, StateName, StateData) -> gen_fsm:reply(StateData#state.http_receiver, HTTPReply), cancel_timer(StateData#state.wait_timer), Rid = StateData#state.rid, - ReqList = [#hbr{rid = Rid, - key = StateData#state.key, - out = Output - } | - [El || El <- StateData#state.req_list, - El#hbr.rid /= Rid ] - ], + ReqList = [#hbr{rid = Rid, key = StateData#state.key, + out = Output} + | [El + || El <- StateData#state.req_list, El#hbr.rid /= Rid]], Reply = ok, {reply, Reply, StateName, - StateData#state{output = [], - http_receiver = undefined, - req_list = ReqList, - wait_timer = undefined, + StateData#state{output = [], http_receiver = undefined, + req_list = ReqList, wait_timer = undefined, timer = Timer}}; handle_sync_event({stop,close}, _From, _StateName, StateData) -> @@ -412,32 +419,30 @@ handle_sync_event({stop,Reason}, _From, _StateName, StateData) -> ?DEBUG("Closing bind session ~p - Reason: ~p", [StateData#state.id, Reason]), Reply = ok, {stop, normal, Reply, StateData}; - %% HTTP PUT: Receive packets from the client -handle_sync_event(#http_put{rid = Rid}, - _From, StateName, StateData) - when StateData#state.shaper_timer /= undefined -> - Pause = - case erlang:read_timer(StateData#state.shaper_timer) of - false -> - 0; - P -> P - end, +handle_sync_event(#http_put{rid = Rid}, _From, + StateName, StateData) + when StateData#state.shaper_timer /= undefined -> + Pause = case + erlang:read_timer(StateData#state.shaper_timer) + of + false -> 0; + P -> P + end, Reply = {wait, Pause}, ?DEBUG("Shaper timer for RID ~p: ~p", [Rid, Reply]), {reply, Reply, StateName, StateData}; - -handle_sync_event(#http_put{payload_size = PayloadSize} = Request, +handle_sync_event(#http_put{payload_size = + PayloadSize} = + Request, _From, StateName, StateData) -> - ?DEBUG("New request: ~p",[Request]), - %% Updating trafic shaper + ?DEBUG("New request: ~p", [Request]), {NewShaperState, NewShaperTimer} = - update_shaper(StateData#state.shaper_state, PayloadSize), - + update_shaper(StateData#state.shaper_state, + PayloadSize), handle_http_put_event(Request, StateName, StateData#state{shaper_state = NewShaperState, shaper_timer = NewShaperTimer}); - %% HTTP GET: send packets to the client handle_sync_event({http_get, Rid, Wait, Hold}, From, StateName, StateData) -> %% setup timer @@ -490,14 +495,13 @@ handle_sync_event({http_get, Rid, Wait, Hold}, From, StateName, StateData) -> req_list = ReqList}} end end; - -handle_sync_event(peername, _From, StateName, StateData) -> +handle_sync_event(peername, _From, StateName, + StateData) -> Reply = {ok, StateData#state.ip}, {reply, Reply, StateName, StateData}; - -handle_sync_event(_Event, _From, StateName, StateData) -> - Reply = ok, - {reply, Reply, StateName, StateData}. +handle_sync_event(_Event, _From, StateName, + StateData) -> + Reply = ok, {reply, Reply, StateName, StateData}. code_change(_OldVsn, StateName, StateData, _Extra) -> {ok, StateName, StateData}. @@ -510,35 +514,30 @@ code_change(_OldVsn, StateName, StateData, _Extra) -> %%---------------------------------------------------------------------- %% We reached the max_inactivity timeout: handle_info({timeout, Timer, _}, _StateName, - #state{id=SID, timer = Timer} = StateData) -> - ?INFO_MSG("Session timeout. Closing the HTTP bind session: ~p", [SID]), + #state{id = SID, timer = Timer} = StateData) -> + ?INFO_MSG("Session timeout. Closing the HTTP bind " + "session: ~p", + [SID]), {stop, normal, StateData}; - handle_info({timeout, WaitTimer, _}, StateName, #state{wait_timer = WaitTimer} = StateData) -> - if - StateData#state.http_receiver /= undefined -> - cancel_timer(StateData#state.timer), - Timer = set_inactivity_timer(StateData#state.pause, - StateData#state.max_inactivity), - gen_fsm:reply(StateData#state.http_receiver, {ok, empty}), - Rid = StateData#state.rid, - ReqList = [#hbr{rid = Rid, - key = StateData#state.key, - out = [] - } | - [El || El <- StateData#state.req_list, - El#hbr.rid /= Rid ] - ], - {next_state, StateName, - StateData#state{http_receiver = undefined, - req_list = ReqList, - wait_timer = undefined, - timer = Timer}}; - true -> - {next_state, StateName, StateData} + if StateData#state.http_receiver /= undefined -> + cancel_timer(StateData#state.timer), + Timer = set_inactivity_timer(StateData#state.pause, + StateData#state.max_inactivity), + gen_fsm:reply(StateData#state.http_receiver, + {ok, empty}), + Rid = StateData#state.rid, + ReqList = [#hbr{rid = Rid, key = StateData#state.key, + out = []} + | [El + || El <- StateData#state.req_list, El#hbr.rid /= Rid]], + {next_state, StateName, + StateData#state{http_receiver = undefined, + req_list = ReqList, wait_timer = undefined, + timer = Timer}}; + true -> {next_state, StateName, StateData} end; - handle_info({timeout, ShaperTimer, _}, StateName, #state{shaper_timer = ShaperTimer} = StateData) -> {next_state, StateName, StateData#state{shaper_timer = undefined}}; @@ -552,14 +551,14 @@ handle_info(_, StateName, StateData) -> %% Returns: any %%---------------------------------------------------------------------- terminate(_Reason, _StateName, StateData) -> - ?DEBUG("terminate: Deleting session ~s", [StateData#state.id]), + ?DEBUG("terminate: Deleting session ~s", + [StateData#state.id]), mnesia:dirty_delete({http_bind, StateData#state.id}), - send_receiver_reply(StateData#state.http_receiver, {ok, terminate}), + send_receiver_reply(StateData#state.http_receiver, + {ok, terminate}), case StateData#state.waiting_input of - false -> - ok; - C2SPid -> - gen_fsm:send_event(C2SPid, closed) + false -> ok; + C2SPid -> gen_fsm:send_event(C2SPid, closed) end, ok. @@ -568,250 +567,227 @@ terminate(_Reason, _StateName, StateData) -> %%%---------------------------------------------------------------------- %% PUT / Get processing: -handle_http_put_event(#http_put{rid = Rid, attrs = Attrs, - hold = Hold} = Request, +handle_http_put_event(#http_put{rid = Rid, + attrs = Attrs, hold = Hold} = + Request, StateName, StateData) -> - ?DEBUG("New request: ~p",[Request]), - %% Check if Rid valid - RidAllow = rid_allow(StateData#state.rid, Rid, Attrs, Hold, - StateData#state.max_pause), - - %% Check if Rid is in sequence or out of sequence: + ?DEBUG("New request: ~p", [Request]), + RidAllow = rid_allow(StateData#state.rid, Rid, Attrs, + Hold, StateData#state.max_pause), case RidAllow of - buffer -> - ?DEBUG("Buffered request: ~p", [Request]), - %% Request is out of sequence: - PendingRequests = StateData#state.unprocessed_req_list, - %% In case an existing RID was already buffered: - Requests = lists:keydelete(Rid, 2, PendingRequests), - ReqList = [#hbr{rid = Rid, - key = StateData#state.key, - out = [] - } | - [El || El <- StateData#state.req_list, - El#hbr.rid > (Rid - 1 - Hold)] - ], - ?DEBUG("reqlist: ~p", [ReqList]), - UnprocessedReqList = [Request | Requests], - cancel_timer(StateData#state.timer), - Timer = set_inactivity_timer(0, StateData#state.max_inactivity), - {reply, ok, StateName, - StateData#state{unprocessed_req_list = UnprocessedReqList, - req_list = ReqList, - timer = Timer}, hibernate}; - - _ -> - %% Request is in sequence: - process_http_put(Request, StateName, StateData, RidAllow) + buffer -> + ?DEBUG("Buffered request: ~p", [Request]), + PendingRequests = StateData#state.unprocessed_req_list, + Requests = lists:keydelete(Rid, 2, PendingRequests), + ReqList = [#hbr{rid = Rid, key = StateData#state.key, + out = []} + | [El + || El <- StateData#state.req_list, + El#hbr.rid > Rid - 1 - Hold]], + ?DEBUG("reqlist: ~p", [ReqList]), + UnprocessedReqList = [Request | Requests], + cancel_timer(StateData#state.timer), + Timer = set_inactivity_timer(0, + StateData#state.max_inactivity), + {reply, ok, StateName, + StateData#state{unprocessed_req_list = + UnprocessedReqList, + req_list = ReqList, timer = Timer}, + hibernate}; + _ -> + process_http_put(Request, StateName, StateData, + RidAllow) end. -process_http_put(#http_put{rid = Rid, attrs = Attrs, payload = Payload, - hold = Hold, stream = StreamTo, - ip = IP} = Request, +process_http_put(#http_put{rid = Rid, attrs = Attrs, + payload = Payload, hold = Hold, stream = StreamTo, + ip = IP} = + Request, StateName, StateData, RidAllow) -> ?DEBUG("Actually processing request: ~p", [Request]), - %% Check if key valid - Key = xml:get_attr_s("key", Attrs), - NewKey = xml:get_attr_s("newkey", Attrs), - KeyAllow = - case RidAllow of - repeat -> - true; - false -> - false; - {true, _} -> - case StateData#state.key of - "" -> - true; - OldKey -> - NextKey = sha:sha(Key), - ?DEBUG("Key/OldKey/NextKey: ~s/~s/~s", [Key, OldKey, NextKey]), - if - OldKey == NextKey -> - true; - true -> - ?DEBUG("wrong key: ~s",[Key]), - false - end - end - end, + Key = xml:get_attr_s(<<"key">>, Attrs), + NewKey = xml:get_attr_s(<<"newkey">>, Attrs), + KeyAllow = case RidAllow of + repeat -> true; + false -> false; + {true, _} -> + case StateData#state.key of + <<"">> -> true; + OldKey -> + NextKey = sha:sha(Key), + ?DEBUG("Key/OldKey/NextKey: ~s/~s/~s", + [Key, OldKey, NextKey]), + if OldKey == NextKey -> true; + true -> ?DEBUG("wrong key: ~s", [Key]), false + end + end + end, TNow = tnow(), - LastPoll = if - Payload == [] -> - TNow; - true -> - 0 + LastPoll = if Payload == [] -> TNow; + true -> 0 end, - if - (Payload == []) and - (Hold == 0) and - (TNow - StateData#state.last_poll < ?MIN_POLLING) -> - Reply = {error, polling_too_frequently}, - {reply, Reply, StateName, StateData}; - KeyAllow -> - case RidAllow of - false -> - Reply = {error, not_exists}, - {reply, Reply, StateName, StateData}; - repeat -> - ?DEBUG("REPEATING ~p", [Rid]), - case [El#hbr.out || - El <- StateData#state.req_list, - El#hbr.rid == Rid] of - [] -> - {error, not_exists}; - [Out | _XS] -> - if (Rid == StateData#state.rid) and - (StateData#state.http_receiver /= undefined) -> - {reply, ok, StateName, StateData}; - true -> - Reply = {repeat, lists:reverse(Out)}, - {reply, Reply, StateName, StateData#state{last_poll = LastPoll}} - end - end; - {true, Pause} -> - SaveKey = if - NewKey == "" -> - Key; - true -> - NewKey - end, - ?DEBUG(" -- SaveKey: ~s~n", [SaveKey]), - - %% save request - ReqList1 = - [El || El <- StateData#state.req_list, - El#hbr.rid > (Rid - 1 - Hold)], - ReqList = - case lists:keymember(Rid, #hbr.rid, ReqList1) of - true -> - ReqList1; - false -> - [#hbr{rid = Rid, - key = StateData#state.key, - out = [] - } | - ReqList1 - ] - end, - ?DEBUG("reqlist: ~p", [ReqList]), - - %% setup next timer - cancel_timer(StateData#state.timer), - Timer = set_inactivity_timer(Pause, - StateData#state.max_inactivity), - case StateData#state.waiting_input of - false -> - Input = - lists:foldl( - fun queue:in/2, - StateData#state.input, Payload), - Reply = ok, - process_buffered_request(Reply, StateName, - StateData#state{input = Input, - rid = Rid, - key = SaveKey, - ctime = TNow, - timer = Timer, - pause = Pause, - last_poll = LastPoll, - req_list = ReqList, - ip = IP - }); - C2SPid -> - case StreamTo of - {To, ""} -> - gen_fsm:send_event( - C2SPid, - {xmlstreamstart, "stream:stream", - [{"to", To}, - {"xmlns", ?NS_CLIENT}, - {"xmlns:stream", ?NS_STREAM}]}); - {To, Version} -> - gen_fsm:send_event( - C2SPid, - {xmlstreamstart, "stream:stream", - [{"to", To}, - {"xmlns", ?NS_CLIENT}, - {"version", Version}, - {"xmlns:stream", ?NS_STREAM}]}); - _ -> - ok - end, - - MaxInactivity = get_max_inactivity(StreamTo, StateData#state.max_inactivity), - MaxPause = get_max_inactivity(StreamTo, StateData#state.max_pause), - - ?DEBUG("really sending now: ~p", [Payload]), - lists:foreach( - fun({xmlstreamend, End}) -> - gen_fsm:send_event( - C2SPid, {xmlstreamend, End}); - (El) -> - gen_fsm:send_event( - C2SPid, {xmlstreamelement, El}) - end, Payload), - Reply = ok, - process_buffered_request(Reply, StateName, - StateData#state{input = queue:new(), - rid = Rid, - key = SaveKey, - ctime = TNow, - timer = Timer, - pause = Pause, - last_poll = LastPoll, - req_list = ReqList, - max_inactivity = MaxInactivity, - max_pause = MaxPause, - ip = IP - }) - end - end; - true -> - Reply = {error, bad_key}, - {reply, Reply, StateName, StateData} + if (Payload == []) and (Hold == 0) and + (TNow - StateData#state.last_poll < (?MIN_POLLING)) -> + Reply = {error, polling_too_frequently}, + {reply, Reply, StateName, StateData}; + KeyAllow -> + case RidAllow of + false -> + Reply = {error, not_exists}, + {reply, Reply, StateName, StateData}; + repeat -> + ?DEBUG("REPEATING ~p", [Rid]), + case [El#hbr.out + || El <- StateData#state.req_list, El#hbr.rid == Rid] + of + [] -> {error, not_exists}; + [Out | _XS] -> + if (Rid == StateData#state.rid) and + (StateData#state.http_receiver /= undefined) -> + {reply, ok, StateName, StateData}; + true -> + Reply = {repeat, lists:reverse(Out)}, + {reply, Reply, StateName, + StateData#state{last_poll = LastPoll}} + end + end; + {true, Pause} -> + SaveKey = if NewKey == <<"">> -> Key; + true -> NewKey + end, + ?DEBUG(" -- SaveKey: ~s~n", [SaveKey]), + ReqList1 = [El + || El <- StateData#state.req_list, + El#hbr.rid > Rid - 1 - Hold], + ReqList = case lists:keymember(Rid, #hbr.rid, ReqList1) + of + true -> ReqList1; + false -> + [#hbr{rid = Rid, key = StateData#state.key, + out = []} + | ReqList1] + end, + ?DEBUG("reqlist: ~p", [ReqList]), + cancel_timer(StateData#state.timer), + Timer = set_inactivity_timer(Pause, + StateData#state.max_inactivity), + case StateData#state.waiting_input of + false -> + Input = lists:foldl(fun queue:in/2, + StateData#state.input, Payload), + Reply = ok, + process_buffered_request(Reply, StateName, + StateData#state{input = Input, + rid = Rid, + key = SaveKey, + ctime = TNow, + timer = Timer, + pause = Pause, + last_poll = + LastPoll, + req_list = + ReqList, + ip = IP}); + C2SPid -> + case StreamTo of + {To, <<"">>} -> + gen_fsm:send_event(C2SPid, + {xmlstreamstart, + <<"stream:stream">>, + [{<<"to">>, To}, + {<<"xmlns">>, ?NS_CLIENT}, + {<<"xmlns:stream">>, + ?NS_STREAM}]}); + {To, Version} -> + gen_fsm:send_event(C2SPid, + {xmlstreamstart, + <<"stream:stream">>, + [{<<"to">>, To}, + {<<"xmlns">>, ?NS_CLIENT}, + {<<"version">>, Version}, + {<<"xmlns:stream">>, + ?NS_STREAM}]}); + _ -> ok + end, + MaxInactivity = get_max_inactivity(StreamTo, + StateData#state.max_inactivity), + MaxPause = get_max_inactivity(StreamTo, + StateData#state.max_pause), + ?DEBUG("really sending now: ~p", [Payload]), + lists:foreach(fun ({xmlstreamend, End}) -> + gen_fsm:send_event(C2SPid, + {xmlstreamend, + End}); + (El) -> + gen_fsm:send_event(C2SPid, + {xmlstreamelement, + El}) + end, + Payload), + Reply = ok, + process_buffered_request(Reply, StateName, + StateData#state{input = + queue:new(), + rid = Rid, + key = SaveKey, + ctime = TNow, + timer = Timer, + pause = Pause, + last_poll = + LastPoll, + req_list = + ReqList, + max_inactivity = + MaxInactivity, + max_pause = + MaxPause, + ip = IP}) + end + end; + true -> + Reply = {error, bad_key}, + {reply, Reply, StateName, StateData} end. process_buffered_request(Reply, StateName, StateData) -> Rid = StateData#state.rid, Requests = StateData#state.unprocessed_req_list, - case lists:keysearch(Rid+1, 2, Requests) of - {value, Request} -> - ?DEBUG("Processing buffered request: ~p", [Request]), - NewRequests = lists:keydelete(Rid+1, 2, Requests), - handle_http_put_event( - Request, StateName, - StateData#state{unprocessed_req_list = NewRequests}); - _ -> - {reply, Reply, StateName, StateData, hibernate} + case lists:keysearch(Rid + 1, 2, Requests) of + {value, Request} -> + ?DEBUG("Processing buffered request: ~p", [Request]), + NewRequests = lists:keydelete(Rid + 1, 2, Requests), + handle_http_put_event(Request, StateName, + StateData#state{unprocessed_req_list = + NewRequests}); + _ -> {reply, Reply, StateName, StateData, hibernate} end. -handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP) -> - case http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP) of - {error, not_exists} -> - ?DEBUG("no session associated with sid: ~p", [Sid]), - {404, ?HEADER, ""}; - {{error, Reason}, Sess} -> - ?DEBUG("Error on HTTP put. Reason: ~p", [Reason]), - handle_http_put_error(Reason, Sess); - {{repeat, OutPacket}, Sess} -> - ?DEBUG("http_put said 'repeat!' ...~nOutPacket: ~p", [OutPacket]), - send_outpacket(Sess, OutPacket); - {{wait, Pause}, _Sess} -> - ?DEBUG("Trafic Shaper: Delaying request ~p", [Rid]), - timer:sleep(Pause), - %{200, ?HEADER, - % xml:element_to_binary( - % {xmlelement, "body", - % [{"xmlns", ?NS_HTTP_BIND}, - % {"type", "error"}], []})}; - handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize, - StreamStart, IP); - {ok, Sess} -> - prepare_response(Sess, Rid, [], StreamStart) +handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize, + StreamStart, IP) -> + case http_put(Sid, Rid, Attrs, Payload, PayloadSize, + StreamStart, IP) + of + {error, not_exists} -> + ?DEBUG("no session associated with sid: ~p", [Sid]), + {404, ?HEADER, <<"">>}; + {{error, Reason}, Sess} -> + ?DEBUG("Error on HTTP put. Reason: ~p", [Reason]), + handle_http_put_error(Reason, Sess); + {{repeat, OutPacket}, Sess} -> + ?DEBUG("http_put said 'repeat!' ...~nOutPacket: ~p", + [OutPacket]), + send_outpacket(Sess, OutPacket); + {{wait, Pause}, _Sess} -> + ?DEBUG("Trafic Shaper: Delaying request ~p", [Rid]), + timer:sleep(Pause), + handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize, + StreamStart, IP); + {ok, Sess} -> + prepare_response(Sess, Rid, [], StreamStart) end. -http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP) -> +http_put(Sid, Rid, Attrs, Payload, PayloadSize, + StreamStart, IP) -> ?DEBUG("Looking for session: ~p", [Sid]), case mnesia:dirty_read({http_bind, Sid}) of [] -> @@ -822,7 +798,7 @@ http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP) -> true -> {To, StreamVersion}; _ -> - "" + <<"">> end, {gen_fsm:sync_send_all_state_event( FsmRef, #http_put{rid = Rid, attrs = Attrs, payload = Payload, @@ -830,425 +806,430 @@ http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP) -> stream = NewStream, ip = IP}, 30000), Sess} end. -handle_http_put_error(Reason, #http_bind{pid=FsmRef, version=Version}) - when Version >= 0 -> - gen_fsm:sync_send_all_state_event(FsmRef, {stop, {put_error,Reason}}), +handle_http_put_error(Reason, + #http_bind{pid = FsmRef, version = Version}) + when Version >= 0 -> + gen_fsm:sync_send_all_state_event(FsmRef, + {stop, {put_error, Reason}}), case Reason of - not_exists -> - {200, ?HEADER, - xml:element_to_binary( - {xmlelement, "body", - [{"xmlns", ?NS_HTTP_BIND}, - {"type", "terminate"}, - {"condition", "item-not-found"}], []})}; - bad_key -> - {200, ?HEADER, - xml:element_to_binary( - {xmlelement, "body", - [{"xmlns", ?NS_HTTP_BIND}, - {"type", "terminate"}, - {"condition", "item-not-found"}], []})}; - polling_too_frequently -> - {200, ?HEADER, - xml:element_to_binary( - {xmlelement, "body", - [{"xmlns", ?NS_HTTP_BIND}, - {"type", "terminate"}, - {"condition", "policy-violation"}], []})} + not_exists -> + {200, ?HEADER, + xml:element_to_binary(#xmlel{name = <<"body">>, + attrs = + [{<<"xmlns">>, ?NS_HTTP_BIND}, + {<<"type">>, <<"terminate">>}, + {<<"condition">>, + <<"item-not-found">>}], + children = []})}; + bad_key -> + {200, ?HEADER, + xml:element_to_binary(#xmlel{name = <<"body">>, + attrs = + [{<<"xmlns">>, ?NS_HTTP_BIND}, + {<<"type">>, <<"terminate">>}, + {<<"condition">>, + <<"item-not-found">>}], + children = []})}; + polling_too_frequently -> + {200, ?HEADER, + xml:element_to_binary(#xmlel{name = <<"body">>, + attrs = + [{<<"xmlns">>, ?NS_HTTP_BIND}, + {<<"type">>, <<"terminate">>}, + {<<"condition">>, + <<"policy-violation">>}], + children = []})} end; -handle_http_put_error(Reason, #http_bind{pid=FsmRef}) -> - gen_fsm:sync_send_all_state_event(FsmRef,{stop, {put_error_no_version, Reason}}), +handle_http_put_error(Reason, + #http_bind{pid = FsmRef}) -> + gen_fsm:sync_send_all_state_event(FsmRef, + {stop, {put_error_no_version, Reason}}), case Reason of - not_exists -> %% bad rid - ?DEBUG("Closing HTTP bind session (Bad rid).", []), - {404, ?HEADER, ""}; - bad_key -> - ?DEBUG("Closing HTTP bind session (Bad key).", []), - {404, ?HEADER, ""}; - polling_too_frequently -> - ?DEBUG("Closing HTTP bind session (User polling too frequently).", []), - {403, ?HEADER, ""} + not_exists -> %% bad rid + ?DEBUG("Closing HTTP bind session (Bad rid).", []), + {404, ?HEADER, <<"">>}; + bad_key -> + ?DEBUG("Closing HTTP bind session (Bad key).", []), + {404, ?HEADER, <<"">>}; + polling_too_frequently -> + ?DEBUG("Closing HTTP bind session (User polling " + "too frequently).", + []), + {403, ?HEADER, <<"">>} end. %% Control RID ordering rid_allow(none, _NewRid, _Attrs, _Hold, _MaxPause) -> - %% First request - nothing saved so far {true, 0}; rid_allow(OldRid, NewRid, Attrs, Hold, MaxPause) -> - ?DEBUG("Previous rid / New rid: ~p/~p", [OldRid, NewRid]), + ?DEBUG("Previous rid / New rid: ~p/~p", + [OldRid, NewRid]), if - %% We did not miss any packet, we can process it immediately: - NewRid == OldRid + 1 -> - case catch list_to_integer( - xml:get_attr_s("pause", Attrs)) of - {'EXIT', _} -> - {true, 0}; - Pause1 when Pause1 =< MaxPause -> - ?DEBUG("got pause: ~p", [Pause1]), - {true, Pause1}; - _ -> - {true, 0} - end; - %% We have missed packets, we need to cached it to process it later on: - (OldRid < NewRid) and - (NewRid =< (OldRid + Hold + 1)) -> - buffer; - (NewRid =< OldRid) and - (NewRid > OldRid - Hold - 1) -> - repeat; - true -> - false + %% We did not miss any packet, we can process it immediately: + NewRid == OldRid + 1 -> + case catch + jlib:binary_to_integer(xml:get_attr_s(<<"pause">>, + Attrs)) + of + {'EXIT', _} -> {true, 0}; + Pause1 when Pause1 =< MaxPause -> + ?DEBUG("got pause: ~p", [Pause1]), {true, Pause1}; + _ -> {true, 0} + end; + %% We have missed packets, we need to cached it to process it later on: + (OldRid < NewRid) and (NewRid =< OldRid + Hold + 1) -> + buffer; + (NewRid =< OldRid) and (NewRid > OldRid - Hold - 1) -> + repeat; + true -> false end. update_shaper(ShaperState, PayloadSize) -> - {NewShaperState, Pause} = shaper:update(ShaperState, PayloadSize), - if - Pause > 0 -> - ShaperTimer = erlang:start_timer(Pause, self(), activate), %% MR: Seems timer is not needed. Activate is not handled - {NewShaperState, ShaperTimer}; - true -> - {NewShaperState, undefined} + {NewShaperState, Pause} = shaper:update(ShaperState, + PayloadSize), + if Pause > 0 -> + ShaperTimer = erlang:start_timer(Pause, self(), + activate), + {NewShaperState, ShaperTimer}; + true -> {NewShaperState, undefined} end. prepare_response(Sess, Rid, OutputEls, StreamStart) -> - receive after Sess#http_bind.process_delay -> ok end, + receive after Sess#http_bind.process_delay -> ok end, case catch http_get(Sess, Rid) of - {ok, cancel} -> - %% actually it would be better if we could completely - %% cancel this request, but then we would have to hack - %% ejabberd_http and I'm too lazy now - {200, ?HEADER, "<body type='error' xmlns='"++?NS_HTTP_BIND++"'/>"}; - {ok, empty} -> - {200, ?HEADER, "<body xmlns='"++?NS_HTTP_BIND++"'/>"}; - {ok, terminate} -> - {200, ?HEADER, "<body type='terminate' xmlns='"++?NS_HTTP_BIND++"'/>"}; - {ok, ROutPacket} -> - OutPacket = lists:reverse(ROutPacket), - ?DEBUG("OutPacket: ~p", [OutputEls++OutPacket]), - prepare_outpacket_response(Sess, Rid, OutputEls++OutPacket, StreamStart); - {'EXIT', {shutdown, _}} -> - {200, ?HEADER, "<body type='terminate' condition='system-shutdown' xmlns='"++?NS_HTTP_BIND++"'/>"}; - {'EXIT', _Reason} -> - {200, ?HEADER, "<body type='terminate' xmlns='"++?NS_HTTP_BIND++"'/>"} + {ok, cancel} -> + {200, ?HEADER, + <<"<body type='error' xmlns='", (?NS_HTTP_BIND)/binary, + "'/>">>}; + {ok, empty} -> + {200, ?HEADER, + <<"<body xmlns='", (?NS_HTTP_BIND)/binary, "'/>">>}; + {ok, terminate} -> + {200, ?HEADER, + <<"<body type='terminate' xmlns='", + (?NS_HTTP_BIND)/binary, "'/>">>}; + {ok, ROutPacket} -> + OutPacket = lists:reverse(ROutPacket), + ?DEBUG("OutPacket: ~p", [OutputEls ++ OutPacket]), + prepare_outpacket_response(Sess, Rid, + OutputEls ++ OutPacket, StreamStart); + {'EXIT', {shutdown, _}} -> + {200, ?HEADER, + <<"<body type='terminate' condition='system-shut" + "down' xmlns='", + (?NS_HTTP_BIND)/binary, "'/>">>}; + {'EXIT', _Reason} -> + {200, ?HEADER, + <<"<body type='terminate' xmlns='", + (?NS_HTTP_BIND)/binary, "'/>">>} end. %% Send output payloads on establised sessions -prepare_outpacket_response(Sess, _Rid, OutPacket, false) -> +prepare_outpacket_response(Sess, _Rid, OutPacket, + false) -> case catch send_outpacket(Sess, OutPacket) of - {'EXIT', _Reason} -> - ?DEBUG("Error in sending packet ~p ", [_Reason]), - {200, ?HEADER, - "<body type='terminate' xmlns='"++ - ?NS_HTTP_BIND++"'/>"}; - SendRes -> - SendRes + {'EXIT', _Reason} -> + ?DEBUG("Error in sending packet ~p ", [_Reason]), + {200, ?HEADER, + <<"<body type='terminate' xmlns='", + (?NS_HTTP_BIND)/binary, "'/>">>}; + SendRes -> SendRes end; %% Handle a new session along with its output payload -prepare_outpacket_response(#http_bind{id=Sid, wait=Wait, - hold=Hold, to=To}=_Sess, - _Rid, OutPacket, true) -> +prepare_outpacket_response(#http_bind{id = Sid, + wait = Wait, hold = Hold, to = To} = + _Sess, + _Rid, OutPacket, true) -> case OutPacket of - [{xmlstreamstart, _, OutAttrs} | Els] -> - AuthID = xml:get_attr_s("id", OutAttrs), - From = xml:get_attr_s("from", OutAttrs), - Version = xml:get_attr_s("version", OutAttrs), - OutEls = - case Els of - [] -> - []; - [{xmlstreamelement, - {xmlelement, "stream:features", - StreamAttribs, StreamEls}} - | StreamTail] -> - TypedTail = - [check_default_xmlns(OEl) || - {xmlstreamelement, OEl} <- - StreamTail], - [{xmlelement, "stream:features", - [{"xmlns:stream", - ?NS_STREAM}] ++ - StreamAttribs, StreamEls}] ++ - TypedTail; - StreamTail -> - [check_default_xmlns(OEl) || - {xmlstreamelement, OEl} <- - StreamTail] - end, - case OutEls of - [{xmlelement, - "stream:error",_,_}] -> - {200, ?HEADER, "<body type='terminate' " - "condition='host-unknown' " - "xmlns='"++?NS_HTTP_BIND++"'/>"}; - _ -> - BOSH_attribs = - [{"authid", AuthID}, - {"xmlns:xmpp", ?NS_BOSH}, - {"xmlns:stream", ?NS_STREAM}] ++ - case OutEls of - [] -> - []; - _ -> - [{"xmpp:version", Version}] - end, - MaxInactivity = get_max_inactivity(To, ?MAX_INACTIVITY), - MaxPause = get_max_pause(To), - {200, ?HEADER, - xml:element_to_binary( - {xmlelement,"body", - [{"xmlns", - ?NS_HTTP_BIND}, - {"sid", Sid}, - {"wait", integer_to_list(Wait)}, - {"requests", integer_to_list(Hold+1)}, - {"inactivity", - integer_to_list( - trunc(MaxInactivity/1000))}, - {"maxpause", - integer_to_list(MaxPause)}, - {"polling", - integer_to_list( - trunc(?MIN_POLLING/1000000))}, - {"ver", ?BOSH_VERSION}, - {"from", From}, - {"secure", "true"} %% we're always being secure - ] ++ BOSH_attribs,OutEls})} - end; - _ -> - {200, ?HEADER, "<body type='terminate' " - "condition='internal-server-error' " - "xmlns='"++?NS_HTTP_BIND++"'/>"} + [{xmlstreamstart, _, OutAttrs} | Els] -> + AuthID = xml:get_attr_s(<<"id">>, OutAttrs), + From = xml:get_attr_s(<<"from">>, OutAttrs), + Version = xml:get_attr_s(<<"version">>, OutAttrs), + OutEls = case Els of + [] -> []; + [{xmlstreamelement, + #xmlel{name = <<"stream:features">>, + attrs = StreamAttribs, children = StreamEls}} + | StreamTail] -> + TypedTail = [check_default_xmlns(OEl) + || {xmlstreamelement, OEl} <- StreamTail], + [#xmlel{name = <<"stream:features">>, + attrs = + [{<<"xmlns:stream">>, ?NS_STREAM}] ++ + StreamAttribs, + children = StreamEls}] + ++ TypedTail; + StreamTail -> + [check_default_xmlns(OEl) + || {xmlstreamelement, OEl} <- StreamTail] + end, + case OutEls of + [#xmlel{name = <<"stream:error">>}] -> + {200, ?HEADER, + <<"<body type='terminate' condition='host-unknow" + "n' xmlns='", + (?NS_HTTP_BIND)/binary, "'/>">>}; + _ -> + BOSH_attribs = [{<<"authid">>, AuthID}, + {<<"xmlns:xmpp">>, ?NS_BOSH}, + {<<"xmlns:stream">>, ?NS_STREAM}] + ++ + case OutEls of + [] -> []; + _ -> [{<<"xmpp:version">>, Version}] + end, + MaxInactivity = get_max_inactivity(To, ?MAX_INACTIVITY), + MaxPause = get_max_pause(To), + {200, ?HEADER, + xml:element_to_binary(#xmlel{name = <<"body">>, + attrs = + [{<<"xmlns">>, ?NS_HTTP_BIND}, + {<<"sid">>, Sid}, + {<<"wait">>, + iolist_to_binary(integer_to_list(Wait))}, + {<<"requests">>, + iolist_to_binary(integer_to_list(Hold + + + 1))}, + {<<"inactivity">>, + iolist_to_binary(integer_to_list(trunc(MaxInactivity + / + 1000)))}, + {<<"maxpause">>, + iolist_to_binary(integer_to_list(MaxPause))}, + {<<"polling">>, + iolist_to_binary(integer_to_list(trunc((?MIN_POLLING) + / + 1000000)))}, + {<<"ver">>, ?BOSH_VERSION}, + {<<"from">>, From}, + {<<"secure">>, <<"true">>}] + ++ BOSH_attribs, + children = OutEls})} + end; + _ -> + {200, ?HEADER, + <<"<body type='terminate' condition='internal-se" + "rver-error' xmlns='", + (?NS_HTTP_BIND)/binary, "'/>">>} end. - -http_get(#http_bind{pid = FsmRef, wait = Wait, hold = Hold}, Rid) -> - gen_fsm:sync_send_all_state_event( - FsmRef, {http_get, Rid, Wait, Hold}, 2 * ?MAX_WAIT * 1000). +http_get(#http_bind{pid = FsmRef, wait = Wait, + hold = Hold}, + Rid) -> + gen_fsm:sync_send_all_state_event(FsmRef, + {http_get, Rid, Wait, Hold}, + 2 * (?MAX_WAIT) * 1000). send_outpacket(#http_bind{pid = FsmRef}, OutPacket) -> case OutPacket of - [] -> - {200, ?HEADER, "<body xmlns='"++?NS_HTTP_BIND++"'/>"}; - [{xmlstreamend, _}] -> - gen_fsm:sync_send_all_state_event(FsmRef,{stop,stream_closed}), - {200, ?HEADER, "<body xmlns='"++?NS_HTTP_BIND++"'/>"}; - _ -> - %% TODO: We parse to add a default namespace to packet, - %% The spec says adding the jabber:client namespace if - %% mandatory, even if some implementation do not do that - %% change on packets. - %% I think this should be an option to avoid modifying - %% packet in most case. - AllElements = - lists:all(fun({xmlstreamelement, - {xmlelement, "stream:error", _, _}}) -> false; - ({xmlstreamelement, _}) -> true; - ({xmlstreamraw, _}) -> true; - (_) -> false - end, OutPacket), - case AllElements of - true -> - TypedEls = lists:foldl(fun({xmlstreamelement, El}, Acc) -> - Acc ++ - [xml:element_to_string( - check_default_xmlns(El) - )]; - ({xmlstreamraw, R}, Acc) -> - Acc ++ [R] - end, - [], - OutPacket), - - Body = "<body xmlns='"++?NS_HTTP_BIND++"'>" - ++ TypedEls ++ - "</body>", - ?DEBUG(" --- outgoing data --- ~n~s~n --- END --- ~n", - [Body]), - {200, ?HEADER, Body}; - false -> - case OutPacket of - [{xmlstreamstart, _, _} | SEls] -> - OutEls = - case SEls of - [{xmlstreamelement, - {xmlelement, - "stream:features", - StreamAttribs, StreamEls}} | - StreamTail] -> - TypedTail = - [check_default_xmlns(OEl) || - {xmlstreamelement, OEl} <- - StreamTail], - [{xmlelement, - "stream:features", - [{"xmlns:stream", - ?NS_STREAM}] ++ - StreamAttribs, StreamEls}] ++ - TypedTail; - StreamTail -> - [check_default_xmlns(OEl) || - {xmlstreamelement, OEl} <- - StreamTail] - end, - {200, ?HEADER, - xml:element_to_binary( - {xmlelement,"body", - [{"xmlns", - ?NS_HTTP_BIND}], - OutEls})}; + [] -> + {200, ?HEADER, + <<"<body xmlns='", (?NS_HTTP_BIND)/binary, "'/>">>}; + [{xmlstreamend, _}] -> + gen_fsm:sync_send_all_state_event(FsmRef, + {stop, stream_closed}), + {200, ?HEADER, + <<"<body xmlns='", (?NS_HTTP_BIND)/binary, "'/>">>}; + _ -> + AllElements = lists:all(fun ({xmlstreamelement, + #xmlel{name = <<"stream:error">>}}) -> + false; + ({xmlstreamelement, _}) -> true; + ({xmlstreamraw, _}) -> true; + (_) -> false + end, + OutPacket), + case AllElements of + true -> + TypedEls = lists:foldl(fun ({xmlstreamelement, El}, + Acc) -> + Acc ++ + [xml:element_to_binary(check_default_xmlns(El))]; + ({xmlstreamraw, R}, Acc) -> + Acc ++ [R] + end, + [], OutPacket), + Body = <<"<body xmlns='", (?NS_HTTP_BIND)/binary, "'>", + (iolist_to_binary(TypedEls))/binary, "</body>">>, + ?DEBUG(" --- outgoing data --- ~n~s~n --- END " + "--- ~n", + [Body]), + {200, ?HEADER, Body}; + false -> + case OutPacket of + [{xmlstreamstart, _, _} | SEls] -> + OutEls = case SEls of + [{xmlstreamelement, + #xmlel{name = <<"stream:features">>, + attrs = StreamAttribs, + children = StreamEls}} + | StreamTail] -> + TypedTail = [check_default_xmlns(OEl) + || {xmlstreamelement, OEl} + <- StreamTail], + [#xmlel{name = <<"stream:features">>, + attrs = + [{<<"xmlns:stream">>, + ?NS_STREAM}] + ++ StreamAttribs, + children = StreamEls}] + ++ TypedTail; + StreamTail -> + [check_default_xmlns(OEl) + || {xmlstreamelement, OEl} <- StreamTail] + end, + {200, ?HEADER, + xml:element_to_binary(#xmlel{name = <<"body">>, + attrs = + [{<<"xmlns">>, + ?NS_HTTP_BIND}], + children = OutEls})}; + _ -> + SErrCond = lists:filter(fun ({xmlstreamelement, + #xmlel{name = + <<"stream:error">>}}) -> + true; + (_) -> false + end, + OutPacket), + StreamErrCond = case SErrCond of + [] -> null; + [{xmlstreamelement, + #xmlel{} = StreamErrorTag} + | _] -> + [StreamErrorTag] + end, + gen_fsm:sync_send_all_state_event(FsmRef, + {stop, + {stream_error, + OutPacket}}), + case StreamErrCond of + null -> + {200, ?HEADER, + <<"<body type='terminate' condition='internal-se" + "rver-error' xmlns='", + (?NS_HTTP_BIND)/binary, "'/>">>}; _ -> - SErrCond = - lists:filter( - fun({xmlstreamelement, - {xmlelement, "stream:error", - _, _}}) -> - true; - (_) -> false - end, OutPacket), - StreamErrCond = - case SErrCond of - [] -> - null; - [{xmlstreamelement, - {xmlelement, _, _, _Cond} = - StreamErrorTag} | _] -> - [StreamErrorTag] - end, - gen_fsm:sync_send_all_state_event(FsmRef, - {stop, {stream_error,OutPacket}}), - case StreamErrCond of - null -> - {200, ?HEADER, - "<body type='terminate' " - "condition='internal-server-error' " - "xmlns='"++?NS_HTTP_BIND++"'/>"}; - _ -> - {200, ?HEADER, - "<body type='terminate' " - "condition='remote-stream-error' " - "xmlns='"++?NS_HTTP_BIND++"' " ++ - "xmlns:stream='"++?NS_STREAM++"'>" ++ - elements_to_string(StreamErrCond) ++ - "</body>"} - end - end - end + {200, ?HEADER, + <<"<body type='terminate' condition='remote-stre" + "am-error' xmlns='", + (?NS_HTTP_BIND)/binary, "' ", "xmlns:stream='", + (?NS_STREAM)/binary, "'>", + (elements_to_string(StreamErrCond))/binary, + "</body>">>} + end + end + end end. parse_request(Data, PayloadSize, MaxStanzaSize) -> - ?DEBUG("--- incoming data --- ~n~s~n --- END --- ", [Data]), - %% MR: I do not think it works if put put several elements in the - %% same body: + ?DEBUG("--- incoming data --- ~n~s~n --- END " + "--- ", + [Data]), case xml_stream:parse_element(Data) of - {xmlelement, "body", Attrs, Els} -> - Xmlns = xml:get_attr_s("xmlns",Attrs), - if - Xmlns /= ?NS_HTTP_BIND -> - {error, bad_request}; - true -> - case catch list_to_integer(xml:get_attr_s("rid", Attrs)) of - {'EXIT', _} -> - {error, bad_request}; - Rid -> - %% I guess this is to remove XMLCDATA: Is it really needed ? - FixedEls = - lists:filter( - fun(I) -> - case I of - {xmlelement, _, _, _} -> - true; - _ -> - false - end - end, Els), - Sid = xml:get_attr_s("sid",Attrs), - if - PayloadSize =< MaxStanzaSize -> - {ok, {Sid, Rid, Attrs, FixedEls}}; - true -> - {size_limit, Sid} - end - end - end; - {xmlelement, _Name, _Attrs, _Els} -> - {error, bad_request}; - {error, _Reason} -> - {error, bad_request} + #xmlel{name = <<"body">>, attrs = Attrs, + children = Els} -> + Xmlns = xml:get_attr_s(<<"xmlns">>, Attrs), + if Xmlns /= (?NS_HTTP_BIND) -> {error, bad_request}; + true -> + case catch + jlib:binary_to_integer(xml:get_attr_s(<<"rid">>, + Attrs)) + of + {'EXIT', _} -> {error, bad_request}; + Rid -> + FixedEls = lists:filter(fun (I) -> + case I of + #xmlel{} -> true; + _ -> false + end + end, + Els), + Sid = xml:get_attr_s(<<"sid">>, Attrs), + if PayloadSize =< MaxStanzaSize -> + {ok, {Sid, Rid, Attrs, FixedEls}}; + true -> {size_limit, Sid} + end + end + end; + #xmlel{} -> {error, bad_request}; + {error, _Reason} -> {error, bad_request} end. -send_receiver_reply(undefined, _Reply) -> - ok; +send_receiver_reply(undefined, _Reply) -> ok; send_receiver_reply(Receiver, Reply) -> gen_fsm:reply(Receiver, Reply). - %% Cancel timer and empty message queue. -cancel_timer(undefined) -> - ok; +cancel_timer(undefined) -> ok; cancel_timer(Timer) -> erlang:cancel_timer(Timer), - receive - {timeout, Timer, _} -> - ok - after 0 -> - ok - end. + receive {timeout, Timer, _} -> ok after 0 -> ok end. %% If client asked for a pause (pause > 0), we apply the pause value %% as inactivity timer: -set_inactivity_timer(Pause, _MaxInactivity) when Pause > 0 -> - erlang:start_timer(Pause*1000, self(), []); +set_inactivity_timer(Pause, _MaxInactivity) + when Pause > 0 -> + erlang:start_timer(Pause * 1000, self(), []); %% Otherwise, we apply the max_inactivity value as inactivity timer: set_inactivity_timer(_Pause, MaxInactivity) -> erlang:start_timer(MaxInactivity, self(), []). - %% TODO: Use tail recursion and list reverse ? -elements_to_string([]) -> - []; +elements_to_string([]) -> []; elements_to_string([El | Els]) -> - [xml:element_to_binary(El)|elements_to_string(Els)]. + [xml:element_to_binary(El) | elements_to_string(Els)]. %% @spec (To, Default::integer()) -> integer() %% where To = [] | {Host::string(), Version::string()} get_max_inactivity({Host, _}, Default) -> - case gen_mod:get_module_opt(Host, mod_http_bind, max_inactivity, undefined) of - Seconds when is_integer(Seconds) -> - Seconds * 1000; - undefined -> - Default + case gen_mod:get_module_opt(Host, mod_http_bind, max_inactivity, + fun(I) when is_integer(I), I>0 -> I end, + undefined) + of + Seconds when is_integer(Seconds) -> Seconds * 1000; + undefined -> Default end; -get_max_inactivity(_, Default) -> - Default. +get_max_inactivity(_, Default) -> Default. get_max_pause({Host, _}) -> - gen_mod:get_module_opt(Host, mod_http_bind, max_pause, ?MAX_PAUSE); -get_max_pause(_) -> - ?MAX_PAUSE. + gen_mod:get_module_opt(Host, mod_http_bind, max_pause, + fun(I) when is_integer(I), I>0 -> I end, + ?MAX_PAUSE); +get_max_pause(_) -> ?MAX_PAUSE. %% Current time as integer tnow() -> {TMegSec, TSec, TMSec} = now(), (TMegSec * 1000000 + TSec) * 1000000 + TMSec. -check_default_xmlns({xmlelement, Name, Attrs, Els} = El) -> - case xml:get_tag_attr_s("xmlns", El) of - "" -> {xmlelement, Name, [{"xmlns", ?NS_CLIENT} | Attrs], Els}; - _ -> El +check_default_xmlns(#xmlel{name = Name, attrs = Attrs, + children = Els} = + El) -> + case xml:get_tag_attr_s(<<"xmlns">>, El) of + <<"">> -> + #xmlel{name = Name, + attrs = [{<<"xmlns">>, ?NS_CLIENT} | Attrs], + children = Els}; + _ -> El end; -check_default_xmlns(El) -> - El. +check_default_xmlns(El) -> El. %% Check that mod_http_bind has been defined in config file. %% Print a warning in log file if this is not the case. check_bind_module(XmppDomain) -> case gen_mod:is_loaded(XmppDomain, mod_http_bind) of - true -> ok; - false -> ?ERROR_MSG("You are trying to use BOSH (HTTP Bind) in host ~p," - " but the module mod_http_bind is not started in" - " that host. Configure your BOSH client to connect" - " to the correct host, or add your desired host to" - " the configuration, or check your 'modules'" - " section in your ejabberd configuration file.", - [XmppDomain]) + true -> true; + false -> + ?ERROR_MSG("You are trying to use BOSH (HTTP Bind) " + "in host ~p, but the module mod_http_bind " + "is not started in that host. Configure " + "your BOSH client to connect to the correct " + "host, or add your desired host to the " + "configuration, or check your 'modules' " + "section in your ejabberd configuration " + "file.", + [XmppDomain]), + false end. |