diff options
Diffstat (limited to 'src/ejabberd_http_bind.erl')
-rw-r--r-- | src/ejabberd_http_bind.erl | 1216 |
1 files changed, 0 insertions, 1216 deletions
diff --git a/src/ejabberd_http_bind.erl b/src/ejabberd_http_bind.erl deleted file mode 100644 index 758c1cee5..000000000 --- a/src/ejabberd_http_bind.erl +++ /dev/null @@ -1,1216 +0,0 @@ -%%%---------------------------------------------------------------------- -%%% File : ejabberd_http_bind.erl -%%% Author : Stefan Strigler <steve@zeank.in-berlin.de> -%%% Purpose : Implements XMPP over BOSH (XEP-0206) -%%% Created : 21 Sep 2005 by Stefan Strigler <steve@zeank.in-berlin.de> -%%% Modified: may 2009 by Mickael Remond, Alexey Schepin -%%% -%%% -%%% ejabberd, Copyright (C) 2002-2016 ProcessOne -%%% -%%% This program is free software; you can redistribute it and/or -%%% modify it under the terms of the GNU General Public License as -%%% published by the Free Software Foundation; either version 2 of the -%%% License, or (at your option) any later version. -%%% -%%% This program is distributed in the hope that it will be useful, -%%% but WITHOUT ANY WARRANTY; without even the implied warranty of -%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -%%% General Public License for more details. -%%% -%%% You should have received a copy of the GNU General Public License along -%%% with this program; if not, write to the Free Software Foundation, Inc., -%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -%%% -%%%---------------------------------------------------------------------- - --module(ejabberd_http_bind). - --protocol({xep, 124, '1.11'}). --protocol({xep, 206, '1.4'}). - --behaviour(gen_fsm). - -%% External exports --export([start_link/4, - init/1, - handle_event/3, - handle_sync_event/4, - code_change/4, - handle_info/3, - terminate/3, - send/2, - send_xml/2, - sockname/1, - peername/1, - setopts/2, - controlling_process/2, - become_controller/2, - custom_receiver/1, - reset_stream/1, - change_shaper/2, - monitor/1, - close/1, - start/5, - handle_session_start/8, - handle_http_put/7, - http_put/7, - http_get/2, - prepare_response/4, - process_request/3]). - --include("ejabberd.hrl"). --include("logger.hrl"). - --include("jlib.hrl"). - --include("ejabberd_http.hrl"). - --include("http_bind.hrl"). - --record(http_bind, - {id, pid, to, hold, wait, process_delay, version}). - --define(NULL_PEER, {{0, 0, 0, 0}, 0}). - -%% http binding request --record(hbr, {rid, - key, - out}). - --record(state, {id, - rid = none, - key, - socket, - output = "", - input = queue:new(), - waiting_input = false, - shaper_state, - shaper_timer, - last_receiver, - last_poll, - http_receiver, - out_of_order_receiver = false, - wait_timer, - ctime = 0, - timer, - pause = 0, - unprocessed_req_list = [], % list of request that have been delayed for proper reordering: {Request, PID} - req_list = [], % list of requests (cache) - max_inactivity, - max_pause, - ip = ?NULL_PEER - }). - -%% Internal request format: --record(http_put, {rid, - attrs, - payload, - payload_size, - hold, - stream, - ip}). - -%%-define(DBGFSM, true). --ifdef(DBGFSM). - --define(FSMOPTS, [{debug, [trace]}]). - --else. - --define(FSMOPTS, []). - --endif. - -%% Wait 100ms before continue processing, to allow the client provide more related stanzas. --define(BOSH_VERSION, <<"1.8">>). - --define(NS_CLIENT, <<"jabber:client">>). - --define(NS_BOSH, <<"urn:xmpp:xbosh">>). - --define(NS_HTTP_BIND, - <<"http://jabber.org/protocol/httpbind">>). - --define(MAX_REQUESTS, 2). - --define(MIN_POLLING, 2000000). - --define(MAX_WAIT, 3600). - --define(MAX_INACTIVITY, 30000). - --define(MAX_PAUSE, 120). - --define(PROCESS_DELAY_DEFAULT, 100). - --define(PROCESS_DELAY_MIN, 0). - --define(PROCESS_DELAY_MAX, 1000). - --define(PROCNAME_MHB, ejabberd_mod_http_bind). - -start(XMPPDomain, Sid, Key, IP, HOpts) -> - ?DEBUG("Starting session", []), - case catch gen_fsm:start(?MODULE, - [Sid, Key, IP, HOpts], - ?FSMOPTS) - of - {ok, Pid} -> {ok, Pid}; - _ -> check_bind_module(XMPPDomain), - {error, "Cannot start HTTP bind session"} - end. - -start_link(Sid, Key, IP, HOpts) -> - gen_fsm:start_link(?MODULE, [Sid, Key, IP, HOpts], ?FSMOPTS). - -send({http_bind, FsmRef, _IP}, Packet) -> - gen_fsm:sync_send_all_state_event(FsmRef, - {send, Packet}). - -send_xml({http_bind, FsmRef, _IP}, Packet) -> - gen_fsm:sync_send_all_state_event(FsmRef, - {send_xml, Packet}). - -setopts({http_bind, FsmRef, _IP}, Opts) -> - case lists:member({active, once}, Opts) of - true -> - gen_fsm:send_all_state_event(FsmRef, {activate, self()}); - _ -> - ok - end. - -controlling_process(_Socket, _Pid) -> ok. - -custom_receiver({http_bind, FsmRef, _IP}) -> - {receiver, ?MODULE, FsmRef}. - -become_controller(FsmRef, C2SPid) -> - gen_fsm:send_all_state_event(FsmRef, - {become_controller, C2SPid}). - -reset_stream({http_bind, _FsmRef, _IP}) -> - ok. - -change_shaper({http_bind, FsmRef, _IP}, Shaper) -> - gen_fsm:send_all_state_event(FsmRef, - {change_shaper, Shaper}). - -monitor({http_bind, FsmRef, _IP}) -> - erlang:monitor(process, FsmRef). - -close({http_bind, FsmRef, _IP}) -> - catch gen_fsm:sync_send_all_state_event(FsmRef, - {stop, close}). - -sockname(_Socket) -> {ok, ?NULL_PEER}. - -peername({http_bind, _FsmRef, IP}) -> {ok, IP}. - - -%% Entry point for data coming from client through ejabberd HTTP server: -process_request(Data, IP, HOpts) -> - Opts1 = ejabberd_c2s_config:get_c2s_limits(), - Opts = [{xml_socket, true} | Opts1], - MaxStanzaSize = case lists:keysearch(max_stanza_size, 1, - Opts) - of - {value, {_, Size}} -> Size; - _ -> infinity - end, - PayloadSize = iolist_size(Data), - case catch parse_request(Data, PayloadSize, - MaxStanzaSize) - of - %% No existing session: - {ok, {<<"">>, Rid, Attrs, Payload}} -> - case fxml:get_attr_s(<<"to">>, Attrs) of - <<"">> -> - ?DEBUG("Session not created (Improper addressing)", []), - {200, ?HEADER, - <<"<body type='terminate' condition='improper-ad" - "dressing' xmlns='", - (?NS_HTTP_BIND)/binary, "'/>">>}; - XmppDomain -> - NXmppDomain = jid:nameprep(XmppDomain), - Sid = p1_sha:sha(term_to_binary({p1_time_compat:monotonic_time(), make_ref()})), - case start(NXmppDomain, Sid, <<"">>, IP, HOpts) of - {error, _} -> - {500, ?HEADER, - <<"<body type='terminate' condition='internal-se" - "rver-error' xmlns='", - (?NS_HTTP_BIND)/binary, - "'>Internal Server Error</body>">>}; - {ok, Pid} -> - handle_session_start(Pid, NXmppDomain, Sid, Rid, Attrs, - Payload, PayloadSize, IP) - end - end; - %% Existing session - {ok, {Sid, Rid, Attrs, Payload1}} -> - StreamStart = case fxml:get_attr_s(<<"xmpp:restart">>, - Attrs) - of - <<"true">> -> true; - _ -> false - end, - Payload2 = case fxml:get_attr_s(<<"type">>, Attrs) of - <<"terminate">> -> - Payload1 ++ [{xmlstreamend, <<"stream:stream">>}]; - _ -> Payload1 - end, - handle_http_put(Sid, Rid, Attrs, Payload2, PayloadSize, - StreamStart, IP); - {size_limit, Sid} -> - case mnesia:dirty_read({http_bind, Sid}) of - {error, _} -> {404, ?HEADER, <<"">>}; - {ok, #http_bind{pid = FsmRef}} -> - gen_fsm:sync_send_all_state_event(FsmRef, - {stop, close}), - {200, ?HEADER, - <<"<body type='terminate' condition='undefined-c" - "ondition' xmlns='", - (?NS_HTTP_BIND)/binary, "'>Request Too Large</body>">>} - end; - _ -> - ?DEBUG("Received bad request: ~p", [Data]), - {400, ?HEADER, <<"">>} - end. - -handle_session_start(Pid, XmppDomain, Sid, Rid, Attrs, - Payload, PayloadSize, IP) -> - ?DEBUG("got pid: ~p", [Pid]), - Wait = case str:to_integer(fxml:get_attr_s(<<"wait">>, - Attrs)) - of - {error, _} -> ?MAX_WAIT; - {CWait, _} -> - if CWait > (?MAX_WAIT) -> ?MAX_WAIT; - true -> CWait - end - end, - Hold = case str:to_integer(fxml:get_attr_s(<<"hold">>, - Attrs)) - of - {error, _} -> (?MAX_REQUESTS) - 1; - {CHold, _} -> - if CHold > (?MAX_REQUESTS) - 1 -> (?MAX_REQUESTS) - 1; - true -> CHold - end - end, - Pdelay = case - str:to_integer(fxml:get_attr_s(<<"process-delay">>, - Attrs)) - of - {error, _} -> ?PROCESS_DELAY_DEFAULT; - {CPdelay, _} - when ((?PROCESS_DELAY_MIN) =< CPdelay) and - (CPdelay =< (?PROCESS_DELAY_MAX)) -> - CPdelay; - {CPdelay, _} -> - lists:max([lists:min([CPdelay, ?PROCESS_DELAY_MAX]), - ?PROCESS_DELAY_MIN]) - end, - Version = case catch - list_to_float(binary_to_list(fxml:get_attr_s(<<"ver">>, Attrs))) - of - {'EXIT', _} -> 0.0; - V -> V - end, - XmppVersion = fxml:get_attr_s(<<"xmpp:version">>, Attrs), - ?DEBUG("Create session: ~p", [Sid]), - mnesia:dirty_write( - #http_bind{id = Sid, - pid = Pid, - to = {XmppDomain, - XmppVersion}, - hold = Hold, - wait = Wait, - process_delay = Pdelay, - version = Version - }), - handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize, true, IP). - -%%%---------------------------------------------------------------------- -%%% Callback functions from gen_fsm -%%%---------------------------------------------------------------------- - -init([Sid, Key, IP, HOpts]) -> - ?DEBUG("started: ~p", [{Sid, Key, IP}]), - Opts1 = ejabberd_c2s_config:get_c2s_limits(), - SOpts = lists:filtermap(fun({stream_management, _}) -> true; - ({max_ack_queue, _}) -> true; - ({resume_timeout, _}) -> true; - ({max_resume_timeout, _}) -> true; - ({resend_on_timeout, _}) -> true; - (_) -> false - end, HOpts), - - Opts = [{xml_socket, true} | SOpts ++ Opts1], - Shaper = none, - ShaperState = shaper:new(Shaper), - Socket = {http_bind, self(), IP}, - ejabberd_socket:start(ejabberd_c2s, ?MODULE, Socket, Opts), - Timer = erlang:start_timer(?MAX_INACTIVITY, self(), []), - {ok, loop, #state{id = Sid, - key = Key, - socket = Socket, - shaper_state = ShaperState, - max_inactivity = ?MAX_INACTIVITY, - max_pause = ?MAX_PAUSE, - timer = Timer}}. - -handle_event({become_controller, C2SPid}, StateName, StateData) -> - case StateData#state.input of - cancel -> - {next_state, StateName, - StateData#state{waiting_input = C2SPid}}; - Input -> - lists:foreach(fun (Event) -> C2SPid ! Event end, - queue:to_list(Input)), - {next_state, StateName, - StateData#state{input = queue:new(), - waiting_input = C2SPid}} - end; -handle_event({change_shaper, Shaper}, StateName, - StateData) -> - NewShaperState = shaper:new(Shaper), - {next_state, StateName, - StateData#state{shaper_state = NewShaperState}}; -handle_event(_Event, StateName, StateData) -> - {next_state, StateName, StateData}. - -handle_sync_event({send_xml, Packet}, _From, StateName, - #state{http_receiver = undefined} = StateData) -> - Output = [Packet | StateData#state.output], - Reply = ok, - {reply, Reply, StateName, - StateData#state{output = Output}}; -handle_sync_event({send_xml, Packet}, _From, StateName, - #state{out_of_order_receiver = true} = StateData) -> - Output = [Packet | StateData#state.output], - Reply = ok, - {reply, Reply, StateName, - StateData#state{output = Output}}; -handle_sync_event({send_xml, Packet}, _From, StateName, - StateData) -> - Output = [Packet | StateData#state.output], - cancel_timer(StateData#state.timer), - Timer = set_inactivity_timer(StateData#state.pause, - StateData#state.max_inactivity), - HTTPReply = {ok, Output}, - gen_fsm:reply(StateData#state.http_receiver, HTTPReply), - cancel_timer(StateData#state.wait_timer), - Rid = StateData#state.rid, - ReqList = [#hbr{rid = Rid, key = StateData#state.key, - out = Output} - | [El - || El <- StateData#state.req_list, El#hbr.rid /= Rid]], - Reply = ok, - {reply, Reply, StateName, - StateData#state{output = [], http_receiver = undefined, - req_list = ReqList, wait_timer = undefined, - timer = Timer}}; - -handle_sync_event({stop,close}, _From, _StateName, StateData) -> - Reply = ok, - {stop, normal, Reply, StateData}; -handle_sync_event({stop,stream_closed}, _From, _StateName, StateData) -> - Reply = ok, - {stop, normal, Reply, StateData}; -handle_sync_event({stop,Reason}, _From, _StateName, StateData) -> - ?DEBUG("Closing bind session ~p - Reason: ~p", [StateData#state.id, Reason]), - Reply = ok, - {stop, normal, Reply, StateData}; -%% HTTP PUT: Receive packets from the client -handle_sync_event(#http_put{rid = Rid}, _From, - StateName, StateData) - when StateData#state.shaper_timer /= undefined -> - Pause = case - erlang:read_timer(StateData#state.shaper_timer) - of - false -> 0; - P -> P - end, - Reply = {wait, Pause}, - ?DEBUG("Shaper timer for RID ~p: ~p", [Rid, Reply]), - {reply, Reply, StateName, StateData}; -handle_sync_event(#http_put{payload_size = - PayloadSize} = - Request, - _From, StateName, StateData) -> - ?DEBUG("New request: ~p", [Request]), - {NewShaperState, NewShaperTimer} = - update_shaper(StateData#state.shaper_state, - PayloadSize), - handle_http_put_event(Request, StateName, - StateData#state{shaper_state = NewShaperState, - shaper_timer = NewShaperTimer}); -%% HTTP GET: send packets to the client -handle_sync_event({http_get, Rid, Wait, Hold}, From, StateName, StateData) -> - TNow = p1_time_compat:system_time(micro_seconds), - if (Hold > 0) and - ((StateData#state.output == []) or (StateData#state.rid < Rid)) and - ((TNow - StateData#state.ctime) < (Wait*1000*1000)) and - (StateData#state.rid =< Rid) and - (StateData#state.pause == 0) -> - send_receiver_reply(StateData#state.http_receiver, {ok, empty}), - cancel_timer(StateData#state.wait_timer), - WaitTimer = erlang:start_timer(Wait * 1000, self(), []), - cancel_timer(StateData#state.timer), - {next_state, StateName, StateData#state{ - http_receiver = From, - out_of_order_receiver = StateData#state.rid < Rid, - wait_timer = WaitTimer, - timer = undefined}}; - true -> - cancel_timer(StateData#state.timer), - Reply = {ok, StateData#state.output}, - ReqList = [#hbr{rid = Rid, - key = StateData#state.key, - out = StateData#state.output} - | [El - || El <- StateData#state.req_list, - El#hbr.rid /= Rid]], - if (StateData#state.http_receiver /= undefined) and - StateData#state.out_of_order_receiver -> - {reply, Reply, StateName, - StateData#state{output = [], timer = undefined, - req_list = ReqList, - out_of_order_receiver = false}}; - true -> - send_receiver_reply(StateData#state.http_receiver, {ok, empty}), - cancel_timer(StateData#state.wait_timer), - Timer = set_inactivity_timer(StateData#state.pause, - StateData#state.max_inactivity), - {reply, Reply, StateName, - StateData#state{output = [], - http_receiver = undefined, - wait_timer = undefined, - timer = Timer, - req_list = ReqList}} - end - end; -handle_sync_event(peername, _From, StateName, - StateData) -> - Reply = {ok, StateData#state.ip}, - {reply, Reply, StateName, StateData}; -handle_sync_event(_Event, _From, StateName, - StateData) -> - Reply = ok, {reply, Reply, StateName, StateData}. - -code_change(_OldVsn, StateName, StateData, _Extra) -> - {ok, StateName, StateData}. - -handle_info({timeout, Timer, _}, _StateName, - #state{id = SID, timer = Timer} = StateData) -> - ?INFO_MSG("Session timeout. Closing the HTTP bind " - "session: ~p", - [SID]), - {stop, normal, StateData}; -handle_info({timeout, WaitTimer, _}, StateName, - #state{wait_timer = WaitTimer} = StateData) -> - if StateData#state.http_receiver /= undefined -> - cancel_timer(StateData#state.timer), - Timer = set_inactivity_timer(StateData#state.pause, - StateData#state.max_inactivity), - gen_fsm:reply(StateData#state.http_receiver, - {ok, empty}), - Rid = StateData#state.rid, - ReqList = [#hbr{rid = Rid, key = StateData#state.key, - out = []} - | [El - || El <- StateData#state.req_list, El#hbr.rid /= Rid]], - {next_state, StateName, - StateData#state{http_receiver = undefined, - req_list = ReqList, wait_timer = undefined, - timer = Timer}}; - true -> {next_state, StateName, StateData} - end; -handle_info({timeout, ShaperTimer, _}, StateName, - #state{shaper_timer = ShaperTimer} = StateData) -> - {next_state, StateName, StateData#state{shaper_timer = undefined}}; - -handle_info(_, StateName, StateData) -> - {next_state, StateName, StateData}. - -terminate(_Reason, _StateName, StateData) -> - ?DEBUG("terminate: Deleting session ~s", - [StateData#state.id]), - mnesia:dirty_delete({http_bind, StateData#state.id}), - send_receiver_reply(StateData#state.http_receiver, - {ok, terminate}), - case StateData#state.waiting_input of - false -> ok; - C2SPid -> gen_fsm:send_event(C2SPid, closed) - end, - ok. - -%%%---------------------------------------------------------------------- -%%% Internal functions -%%%---------------------------------------------------------------------- - -%% PUT / Get processing: -handle_http_put_event(#http_put{rid = Rid, - attrs = Attrs, hold = Hold} = - Request, - StateName, StateData) -> - ?DEBUG("New request: ~p", [Request]), - RidAllow = rid_allow(StateData#state.rid, Rid, Attrs, - Hold, StateData#state.max_pause), - case RidAllow of - buffer -> - ?DEBUG("Buffered request: ~p", [Request]), - PendingRequests = StateData#state.unprocessed_req_list, - Requests = lists:keydelete(Rid, 2, PendingRequests), - ReqList = [#hbr{rid = Rid, key = StateData#state.key, - out = []} - | [El - || El <- StateData#state.req_list, - El#hbr.rid > Rid - 1 - Hold]], - ?DEBUG("reqlist: ~p", [ReqList]), - UnprocessedReqList = [Request | Requests], - cancel_timer(StateData#state.timer), - Timer = set_inactivity_timer(0, - StateData#state.max_inactivity), - {reply, ok, StateName, - StateData#state{unprocessed_req_list = - UnprocessedReqList, - req_list = ReqList, timer = Timer}, - hibernate}; - _ -> - process_http_put(Request, StateName, StateData, - RidAllow) - end. - -process_http_put(#http_put{rid = Rid, attrs = Attrs, - payload = Payload, hold = Hold, stream = StreamTo, - ip = IP} = - Request, - StateName, StateData, RidAllow) -> - ?DEBUG("Actually processing request: ~p", [Request]), - Key = fxml:get_attr_s(<<"key">>, Attrs), - NewKey = fxml:get_attr_s(<<"newkey">>, Attrs), - KeyAllow = case RidAllow of - repeat -> true; - false -> false; - {true, _} -> - case StateData#state.key of - <<"">> -> true; - OldKey -> - NextKey = p1_sha:sha(Key), - ?DEBUG("Key/OldKey/NextKey: ~s/~s/~s", - [Key, OldKey, NextKey]), - if OldKey == NextKey -> true; - true -> ?DEBUG("wrong key: ~s", [Key]), false - end - end - end, - TNow = p1_time_compat:system_time(micro_seconds), - LastPoll = if Payload == [] -> TNow; - true -> 0 - end, - if (Payload == []) and (Hold == 0) and - (TNow - StateData#state.last_poll < (?MIN_POLLING)) -> - Reply = {error, polling_too_frequently}, - {reply, Reply, StateName, StateData}; - KeyAllow -> - case RidAllow of - false -> - Reply = {error, not_exists}, - {reply, Reply, StateName, StateData}; - repeat -> - ?DEBUG("REPEATING ~p", [Rid]), - case [El#hbr.out - || El <- StateData#state.req_list, El#hbr.rid == Rid] - of - [] -> {error, not_exists}; - [Out | _XS] -> - if (Rid == StateData#state.rid) and - (StateData#state.http_receiver /= undefined) -> - {reply, ok, StateName, StateData}; - true -> - Reply = {repeat, lists:reverse(Out)}, - {reply, Reply, StateName, - StateData#state{last_poll = LastPoll}} - end - end; - {true, Pause} -> - SaveKey = if NewKey == <<"">> -> Key; - true -> NewKey - end, - ?DEBUG(" -- SaveKey: ~s~n", [SaveKey]), - ReqList1 = [El - || El <- StateData#state.req_list, - El#hbr.rid > Rid - 1 - Hold], - ReqList = case lists:keymember(Rid, #hbr.rid, ReqList1) - of - true -> ReqList1; - false -> - [#hbr{rid = Rid, key = StateData#state.key, - out = []} - | ReqList1] - end, - ?DEBUG("reqlist: ~p", [ReqList]), - cancel_timer(StateData#state.timer), - Timer = set_inactivity_timer(Pause, - StateData#state.max_inactivity), - case StateData#state.waiting_input of - false -> - Input = lists:foldl(fun queue:in/2, - StateData#state.input, Payload), - Reply = ok, - process_buffered_request(Reply, StateName, - StateData#state{input = Input, - rid = Rid, - key = SaveKey, - ctime = TNow, - timer = Timer, - pause = Pause, - last_poll = - LastPoll, - req_list = - ReqList, - ip = IP}); - C2SPid -> - case StreamTo of - {To, <<"">>} -> - gen_fsm:send_event(C2SPid, - {xmlstreamstart, - <<"stream:stream">>, - [{<<"to">>, To}, - {<<"xmlns">>, ?NS_CLIENT}, - {<<"xmlns:stream">>, - ?NS_STREAM}]}); - {To, Version} -> - gen_fsm:send_event(C2SPid, - {xmlstreamstart, - <<"stream:stream">>, - [{<<"to">>, To}, - {<<"xmlns">>, ?NS_CLIENT}, - {<<"version">>, Version}, - {<<"xmlns:stream">>, - ?NS_STREAM}]}); - _ -> ok - end, - MaxInactivity = get_max_inactivity(StreamTo, - StateData#state.max_inactivity), - MaxPause = get_max_inactivity(StreamTo, - StateData#state.max_pause), - ?DEBUG("really sending now: ~p", [Payload]), - lists:foreach(fun ({xmlstreamend, End}) -> - gen_fsm:send_event(C2SPid, - {xmlstreamend, - End}); - (El) -> - gen_fsm:send_event(C2SPid, - {xmlstreamelement, - El}) - end, - Payload), - Reply = ok, - process_buffered_request(Reply, StateName, - StateData#state{input = - queue:new(), - rid = Rid, - key = SaveKey, - ctime = TNow, - timer = Timer, - pause = Pause, - last_poll = - LastPoll, - req_list = - ReqList, - max_inactivity = - MaxInactivity, - max_pause = - MaxPause, - ip = IP}) - end - end; - true -> - Reply = {error, bad_key}, - {reply, Reply, StateName, StateData} - end. - -process_buffered_request(Reply, StateName, StateData) -> - Rid = StateData#state.rid, - Requests = StateData#state.unprocessed_req_list, - case lists:keysearch(Rid + 1, 2, Requests) of - {value, Request} -> - ?DEBUG("Processing buffered request: ~p", [Request]), - NewRequests = lists:keydelete(Rid + 1, 2, Requests), - handle_http_put_event(Request, StateName, - StateData#state{unprocessed_req_list = - NewRequests}); - _ -> {reply, Reply, StateName, StateData, hibernate} - end. - -handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize, - StreamStart, IP) -> - case http_put(Sid, Rid, Attrs, Payload, PayloadSize, - StreamStart, IP) - of - {error, not_exists} -> - ?DEBUG("no session associated with sid: ~p", [Sid]), - {404, ?HEADER, <<"">>}; - {{error, Reason}, Sess} -> - ?DEBUG("Error on HTTP put. Reason: ~p", [Reason]), - handle_http_put_error(Reason, Sess); - {{repeat, OutPacket}, Sess} -> - ?DEBUG("http_put said 'repeat!' ...~nOutPacket: ~p", - [OutPacket]), - send_outpacket(Sess, OutPacket); - {{wait, Pause}, _Sess} -> - ?DEBUG("Trafic Shaper: Delaying request ~p", [Rid]), - timer:sleep(Pause), - handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize, - StreamStart, IP); - {ok, Sess} -> - prepare_response(Sess, Rid, [], StreamStart) - end. - -http_put(Sid, Rid, Attrs, Payload, PayloadSize, - StreamStart, IP) -> - ?DEBUG("Looking for session: ~p", [Sid]), - case mnesia:dirty_read({http_bind, Sid}) of - [] -> - {error, not_exists}; - [#http_bind{pid = FsmRef, hold=Hold, - to= {To, StreamVersion}} = Sess] -> - NewStream = case StreamStart of - true -> {To, StreamVersion}; - _ -> <<"">> - end, - {gen_fsm:sync_send_all_state_event( - FsmRef, #http_put{rid = Rid, - attrs = Attrs, - payload = Payload, - payload_size = PayloadSize, - hold = Hold, - stream = NewStream, - ip = IP}, - 30000), Sess} - end. - -handle_http_put_error(Reason, - #http_bind{pid = FsmRef, version = Version}) - when Version >= 0 -> - gen_fsm:sync_send_all_state_event(FsmRef, - {stop, {put_error, Reason}}), - case Reason of - not_exists -> - {200, ?HEADER, - fxml:element_to_binary(#xmlel{name = <<"body">>, - attrs = - [{<<"xmlns">>, ?NS_HTTP_BIND}, - {<<"type">>, <<"terminate">>}, - {<<"condition">>, - <<"item-not-found">>}], - children = []})}; - bad_key -> - {200, ?HEADER, - fxml:element_to_binary(#xmlel{name = <<"body">>, - attrs = - [{<<"xmlns">>, ?NS_HTTP_BIND}, - {<<"type">>, <<"terminate">>}, - {<<"condition">>, - <<"item-not-found">>}], - children = []})}; - polling_too_frequently -> - {200, ?HEADER, - fxml:element_to_binary(#xmlel{name = <<"body">>, - attrs = - [{<<"xmlns">>, ?NS_HTTP_BIND}, - {<<"type">>, <<"terminate">>}, - {<<"condition">>, - <<"policy-violation">>}], - children = []})} - end; -handle_http_put_error(Reason, - #http_bind{pid = FsmRef}) -> - gen_fsm:sync_send_all_state_event(FsmRef, - {stop, {put_error_no_version, Reason}}), - case Reason of - not_exists -> %% bad rid - ?DEBUG("Closing HTTP bind session (Bad rid).", []), - {404, ?HEADER, <<"">>}; - bad_key -> - ?DEBUG("Closing HTTP bind session (Bad key).", []), - {404, ?HEADER, <<"">>}; - polling_too_frequently -> - ?DEBUG("Closing HTTP bind session (User polling " - "too frequently).", - []), - {403, ?HEADER, <<"">>} - end. - -%% Control RID ordering -rid_allow(none, _NewRid, _Attrs, _Hold, _MaxPause) -> - {true, 0}; -rid_allow(OldRid, NewRid, Attrs, Hold, MaxPause) -> - ?DEBUG("Previous rid / New rid: ~p/~p", - [OldRid, NewRid]), - if - %% We did not miss any packet, we can process it immediately: - NewRid == OldRid + 1 -> - case catch - jlib:binary_to_integer(fxml:get_attr_s(<<"pause">>, - Attrs)) - of - {'EXIT', _} -> {true, 0}; - Pause1 when Pause1 =< MaxPause -> - ?DEBUG("got pause: ~p", [Pause1]), {true, Pause1}; - _ -> {true, 0} - end; - %% We have missed packets, we need to cached it to process it later on: - (OldRid < NewRid) and (NewRid =< OldRid + Hold + 1) -> - buffer; - (NewRid =< OldRid) and (NewRid > OldRid - Hold - 1) -> - repeat; - true -> false - end. - -update_shaper(ShaperState, PayloadSize) -> - {NewShaperState, Pause} = shaper:update(ShaperState, - PayloadSize), - if Pause > 0 -> - ShaperTimer = erlang:start_timer(Pause, self(), - activate), - {NewShaperState, ShaperTimer}; - true -> {NewShaperState, undefined} - end. - -prepare_response(Sess, Rid, OutputEls, StreamStart) -> - receive after Sess#http_bind.process_delay -> ok end, - case catch http_get(Sess, Rid) of - {ok, cancel} -> - {200, ?HEADER, - <<"<body type='error' xmlns='", (?NS_HTTP_BIND)/binary, - "'/>">>}; - {ok, empty} -> - {200, ?HEADER, - <<"<body xmlns='", (?NS_HTTP_BIND)/binary, "'/>">>}; - {ok, terminate} -> - {200, ?HEADER, - <<"<body type='terminate' xmlns='", - (?NS_HTTP_BIND)/binary, "'/>">>}; - {ok, ROutPacket} -> - OutPacket = lists:reverse(ROutPacket), - ?DEBUG("OutPacket: ~p", [OutputEls ++ OutPacket]), - prepare_outpacket_response(Sess, Rid, - OutputEls ++ OutPacket, StreamStart); - {'EXIT', {shutdown, _}} -> - {200, ?HEADER, - <<"<body type='terminate' condition='system-shut" - "down' xmlns='", - (?NS_HTTP_BIND)/binary, "'/>">>}; - {'EXIT', _Reason} -> - {200, ?HEADER, - <<"<body type='terminate' xmlns='", - (?NS_HTTP_BIND)/binary, "'/>">>} - end. - -%% Send output payloads on establised sessions -prepare_outpacket_response(Sess, _Rid, OutPacket, - false) -> - case catch send_outpacket(Sess, OutPacket) of - {'EXIT', _Reason} -> - ?DEBUG("Error in sending packet ~p ", [_Reason]), - {200, ?HEADER, - <<"<body type='terminate' xmlns='", - (?NS_HTTP_BIND)/binary, "'/>">>}; - SendRes -> SendRes - end; -%% Handle a new session along with its output payload -prepare_outpacket_response(#http_bind{id = Sid, - wait = Wait, hold = Hold, to = To} = - _Sess, - _Rid, OutPacket, true) -> - case OutPacket of - [{xmlstreamstart, _, OutAttrs} | Els] -> - AuthID = fxml:get_attr_s(<<"id">>, OutAttrs), - From = fxml:get_attr_s(<<"from">>, OutAttrs), - Version = fxml:get_attr_s(<<"version">>, OutAttrs), - OutEls = case Els of - [] -> []; - [{xmlstreamelement, - #xmlel{name = <<"stream:features">>, - attrs = StreamAttribs, children = StreamEls}} - | StreamTail] -> - TypedTail = [check_default_xmlns(OEl) - || {xmlstreamelement, OEl} <- StreamTail], - [#xmlel{name = <<"stream:features">>, - attrs = - [{<<"xmlns:stream">>, ?NS_STREAM}] ++ - StreamAttribs, - children = StreamEls}] - ++ TypedTail; - StreamTail -> - [check_default_xmlns(OEl) - || {xmlstreamelement, OEl} <- StreamTail] - end, - case OutEls of - [#xmlel{name = <<"stream:error">>}] -> - {200, ?HEADER, - <<"<body type='terminate' condition='host-unknow" - "n' xmlns='", - (?NS_HTTP_BIND)/binary, "'/>">>}; - _ -> - BOSH_attribs = [{<<"authid">>, AuthID}, - {<<"xmlns:xmpp">>, ?NS_BOSH}, - {<<"xmlns:stream">>, ?NS_STREAM}] - ++ - case OutEls of - [] -> []; - _ -> [{<<"xmpp:version">>, Version}] - end, - MaxInactivity = get_max_inactivity(To, ?MAX_INACTIVITY), - MaxPause = get_max_pause(To), - {200, ?HEADER, - fxml:element_to_binary(#xmlel{name = <<"body">>, - attrs = - [{<<"xmlns">>, ?NS_HTTP_BIND}, - {<<"sid">>, Sid}, - {<<"wait">>, - iolist_to_binary(integer_to_list(Wait))}, - {<<"requests">>, - iolist_to_binary(integer_to_list(Hold - + - 1))}, - {<<"inactivity">>, - iolist_to_binary(integer_to_list(trunc(MaxInactivity - / - 1000)))}, - {<<"maxpause">>, - iolist_to_binary(integer_to_list(MaxPause))}, - {<<"polling">>, - iolist_to_binary(integer_to_list(trunc((?MIN_POLLING) - / - 1000000)))}, - {<<"ver">>, ?BOSH_VERSION}, - {<<"from">>, From}, - {<<"secure">>, <<"true">>}] - ++ BOSH_attribs, - children = OutEls})} - end; - _ -> - {200, ?HEADER, - <<"<body type='terminate' condition='internal-se" - "rver-error' xmlns='", - (?NS_HTTP_BIND)/binary, "'/>">>} - end. - -http_get(#http_bind{pid = FsmRef, wait = Wait, - hold = Hold}, - Rid) -> - gen_fsm:sync_send_all_state_event(FsmRef, - {http_get, Rid, Wait, Hold}, - 2 * (?MAX_WAIT) * 1000). - -send_outpacket(#http_bind{pid = FsmRef}, OutPacket) -> - case OutPacket of - [] -> - {200, ?HEADER, - <<"<body xmlns='", (?NS_HTTP_BIND)/binary, "'/>">>}; - [{xmlstreamend, _}] -> - gen_fsm:sync_send_all_state_event(FsmRef, - {stop, stream_closed}), - {200, ?HEADER, - <<"<body xmlns='", (?NS_HTTP_BIND)/binary, "'/>">>}; - _ -> - AllElements = lists:all(fun ({xmlstreamelement, - #xmlel{name = <<"stream:error">>}}) -> - false; - ({xmlstreamelement, _}) -> true; - ({xmlstreamraw, _}) -> true; - (_) -> false - end, - OutPacket), - case AllElements of - true -> - TypedEls = lists:foldl(fun ({xmlstreamelement, El}, - Acc) -> - Acc ++ - [fxml:element_to_binary(check_default_xmlns(El))]; - ({xmlstreamraw, R}, Acc) -> - Acc ++ [R] - end, - [], OutPacket), - Body = <<"<body xmlns='", (?NS_HTTP_BIND)/binary, "'>", - (iolist_to_binary(TypedEls))/binary, "</body>">>, - ?DEBUG(" --- outgoing data --- ~n~s~n --- END " - "--- ~n", - [Body]), - {200, ?HEADER, Body}; - false -> - case OutPacket of - [{xmlstreamstart, _, _} | SEls] -> - OutEls = case SEls of - [{xmlstreamelement, - #xmlel{name = <<"stream:features">>, - attrs = StreamAttribs, - children = StreamEls}} - | StreamTail] -> - TypedTail = [check_default_xmlns(OEl) - || {xmlstreamelement, OEl} - <- StreamTail], - [#xmlel{name = <<"stream:features">>, - attrs = - [{<<"xmlns:stream">>, - ?NS_STREAM}] - ++ StreamAttribs, - children = StreamEls}] - ++ TypedTail; - StreamTail -> - [check_default_xmlns(OEl) - || {xmlstreamelement, OEl} <- StreamTail] - end, - {200, ?HEADER, - fxml:element_to_binary(#xmlel{name = <<"body">>, - attrs = - [{<<"xmlns">>, - ?NS_HTTP_BIND}], - children = OutEls})}; - _ -> - SErrCond = lists:filter(fun ({xmlstreamelement, - #xmlel{name = - <<"stream:error">>}}) -> - true; - (_) -> false - end, - OutPacket), - StreamErrCond = case SErrCond of - [] -> null; - [{xmlstreamelement, - #xmlel{} = StreamErrorTag} - | _] -> - [StreamErrorTag] - end, - gen_fsm:sync_send_all_state_event(FsmRef, - {stop, - {stream_error, - OutPacket}}), - case StreamErrCond of - null -> - {200, ?HEADER, - <<"<body type='terminate' condition='internal-se" - "rver-error' xmlns='", - (?NS_HTTP_BIND)/binary, "'/>">>}; - _ -> - {200, ?HEADER, - <<"<body type='terminate' condition='remote-stre" - "am-error' xmlns='", - (?NS_HTTP_BIND)/binary, "' ", "xmlns:stream='", - (?NS_STREAM)/binary, "'>", - (elements_to_string(StreamErrCond))/binary, - "</body>">>} - end - end - end - end. - -parse_request(Data, PayloadSize, MaxStanzaSize) -> - ?DEBUG("--- incoming data --- ~n~s~n --- END " - "--- ", - [Data]), - case fxml_stream:parse_element(Data) of - #xmlel{name = <<"body">>, attrs = Attrs, - children = Els} -> - Xmlns = fxml:get_attr_s(<<"xmlns">>, Attrs), - if Xmlns /= (?NS_HTTP_BIND) -> {error, bad_request}; - true -> - case catch - jlib:binary_to_integer(fxml:get_attr_s(<<"rid">>, - Attrs)) - of - {'EXIT', _} -> {error, bad_request}; - Rid -> - FixedEls = lists:filter(fun (I) -> - case I of - #xmlel{} -> true; - _ -> false - end - end, - Els), - Sid = fxml:get_attr_s(<<"sid">>, Attrs), - if PayloadSize =< MaxStanzaSize -> - {ok, {Sid, Rid, Attrs, FixedEls}}; - true -> {size_limit, Sid} - end - end - end; - #xmlel{} -> {error, bad_request}; - {error, _Reason} -> {error, bad_request} - end. - -send_receiver_reply(undefined, _Reply) -> ok; -send_receiver_reply(Receiver, Reply) -> - gen_fsm:reply(Receiver, Reply). - -%% Cancel timer and empty message queue. -cancel_timer(undefined) -> ok; -cancel_timer(Timer) -> - erlang:cancel_timer(Timer), - receive {timeout, Timer, _} -> ok after 0 -> ok end. - -%% If client asked for a pause (pause > 0), we apply the pause value -%% as inactivity timer: -set_inactivity_timer(Pause, _MaxInactivity) - when Pause > 0 -> - erlang:start_timer(Pause * 1000, self(), []); -%% Otherwise, we apply the max_inactivity value as inactivity timer: -set_inactivity_timer(_Pause, MaxInactivity) -> - erlang:start_timer(MaxInactivity, self(), []). - -elements_to_string([]) -> []; -elements_to_string([El | Els]) -> - [fxml:element_to_binary(El) | elements_to_string(Els)]. - -%% @spec (To, Default::integer()) -> integer() -%% where To = [] | {Host::string(), Version::string()} -get_max_inactivity({Host, _}, Default) -> - case gen_mod:get_module_opt(Host, mod_http_bind, max_inactivity, - fun(I) when is_integer(I), I>0 -> I end, - undefined) - of - Seconds when is_integer(Seconds) -> Seconds * 1000; - undefined -> Default - end; -get_max_inactivity(_, Default) -> Default. - -get_max_pause({Host, _}) -> - gen_mod:get_module_opt(Host, mod_http_bind, max_pause, - fun(I) when is_integer(I), I>0 -> I end, - ?MAX_PAUSE); -get_max_pause(_) -> ?MAX_PAUSE. - -check_default_xmlns(#xmlel{name = Name, attrs = Attrs, - children = Els} = - El) -> - case fxml:get_tag_attr_s(<<"xmlns">>, El) of - <<"">> -> - #xmlel{name = Name, - attrs = [{<<"xmlns">>, ?NS_CLIENT} | Attrs], - children = Els}; - _ -> El - end; -check_default_xmlns(El) -> El. - -%% Check that mod_http_bind has been defined in config file. -%% Print a warning in log file if this is not the case. -check_bind_module(XmppDomain) -> - case gen_mod:is_loaded(XmppDomain, mod_http_bind) of - true -> true; - false -> - ?ERROR_MSG("You are trying to use BOSH (HTTP Bind) " - "in host ~p, but the module mod_http_bind " - "is not started in that host. Configure " - "your BOSH client to connect to the correct " - "host, or add your desired host to the " - "configuration, or check your 'modules' " - "section in your ejabberd configuration " - "file.", - [XmppDomain]), - false - end. |