aboutsummaryrefslogtreecommitdiff
path: root/src/ejabberd_http_bind.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/ejabberd_http_bind.erl')
-rw-r--r--src/ejabberd_http_bind.erl1216
1 files changed, 0 insertions, 1216 deletions
diff --git a/src/ejabberd_http_bind.erl b/src/ejabberd_http_bind.erl
deleted file mode 100644
index 758c1cee5..000000000
--- a/src/ejabberd_http_bind.erl
+++ /dev/null
@@ -1,1216 +0,0 @@
-%%%----------------------------------------------------------------------
-%%% File : ejabberd_http_bind.erl
-%%% Author : Stefan Strigler <steve@zeank.in-berlin.de>
-%%% Purpose : Implements XMPP over BOSH (XEP-0206)
-%%% Created : 21 Sep 2005 by Stefan Strigler <steve@zeank.in-berlin.de>
-%%% Modified: may 2009 by Mickael Remond, Alexey Schepin
-%%%
-%%%
-%%% ejabberd, Copyright (C) 2002-2016 ProcessOne
-%%%
-%%% This program is free software; you can redistribute it and/or
-%%% modify it under the terms of the GNU General Public License as
-%%% published by the Free Software Foundation; either version 2 of the
-%%% License, or (at your option) any later version.
-%%%
-%%% This program is distributed in the hope that it will be useful,
-%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
-%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-%%% General Public License for more details.
-%%%
-%%% You should have received a copy of the GNU General Public License along
-%%% with this program; if not, write to the Free Software Foundation, Inc.,
-%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-%%%
-%%%----------------------------------------------------------------------
-
--module(ejabberd_http_bind).
-
--protocol({xep, 124, '1.11'}).
--protocol({xep, 206, '1.4'}).
-
--behaviour(gen_fsm).
-
-%% External exports
--export([start_link/4,
- init/1,
- handle_event/3,
- handle_sync_event/4,
- code_change/4,
- handle_info/3,
- terminate/3,
- send/2,
- send_xml/2,
- sockname/1,
- peername/1,
- setopts/2,
- controlling_process/2,
- become_controller/2,
- custom_receiver/1,
- reset_stream/1,
- change_shaper/2,
- monitor/1,
- close/1,
- start/5,
- handle_session_start/8,
- handle_http_put/7,
- http_put/7,
- http_get/2,
- prepare_response/4,
- process_request/3]).
-
--include("ejabberd.hrl").
--include("logger.hrl").
-
--include("jlib.hrl").
-
--include("ejabberd_http.hrl").
-
--include("http_bind.hrl").
-
--record(http_bind,
- {id, pid, to, hold, wait, process_delay, version}).
-
--define(NULL_PEER, {{0, 0, 0, 0}, 0}).
-
-%% http binding request
--record(hbr, {rid,
- key,
- out}).
-
--record(state, {id,
- rid = none,
- key,
- socket,
- output = "",
- input = queue:new(),
- waiting_input = false,
- shaper_state,
- shaper_timer,
- last_receiver,
- last_poll,
- http_receiver,
- out_of_order_receiver = false,
- wait_timer,
- ctime = 0,
- timer,
- pause = 0,
- unprocessed_req_list = [], % list of request that have been delayed for proper reordering: {Request, PID}
- req_list = [], % list of requests (cache)
- max_inactivity,
- max_pause,
- ip = ?NULL_PEER
- }).
-
-%% Internal request format:
--record(http_put, {rid,
- attrs,
- payload,
- payload_size,
- hold,
- stream,
- ip}).
-
-%%-define(DBGFSM, true).
--ifdef(DBGFSM).
-
--define(FSMOPTS, [{debug, [trace]}]).
-
--else.
-
--define(FSMOPTS, []).
-
--endif.
-
-%% Wait 100ms before continue processing, to allow the client provide more related stanzas.
--define(BOSH_VERSION, <<"1.8">>).
-
--define(NS_CLIENT, <<"jabber:client">>).
-
--define(NS_BOSH, <<"urn:xmpp:xbosh">>).
-
--define(NS_HTTP_BIND,
- <<"http://jabber.org/protocol/httpbind">>).
-
--define(MAX_REQUESTS, 2).
-
--define(MIN_POLLING, 2000000).
-
--define(MAX_WAIT, 3600).
-
--define(MAX_INACTIVITY, 30000).
-
--define(MAX_PAUSE, 120).
-
--define(PROCESS_DELAY_DEFAULT, 100).
-
--define(PROCESS_DELAY_MIN, 0).
-
--define(PROCESS_DELAY_MAX, 1000).
-
--define(PROCNAME_MHB, ejabberd_mod_http_bind).
-
-start(XMPPDomain, Sid, Key, IP, HOpts) ->
- ?DEBUG("Starting session", []),
- case catch gen_fsm:start(?MODULE,
- [Sid, Key, IP, HOpts],
- ?FSMOPTS)
- of
- {ok, Pid} -> {ok, Pid};
- _ -> check_bind_module(XMPPDomain),
- {error, "Cannot start HTTP bind session"}
- end.
-
-start_link(Sid, Key, IP, HOpts) ->
- gen_fsm:start_link(?MODULE, [Sid, Key, IP, HOpts], ?FSMOPTS).
-
-send({http_bind, FsmRef, _IP}, Packet) ->
- gen_fsm:sync_send_all_state_event(FsmRef,
- {send, Packet}).
-
-send_xml({http_bind, FsmRef, _IP}, Packet) ->
- gen_fsm:sync_send_all_state_event(FsmRef,
- {send_xml, Packet}).
-
-setopts({http_bind, FsmRef, _IP}, Opts) ->
- case lists:member({active, once}, Opts) of
- true ->
- gen_fsm:send_all_state_event(FsmRef, {activate, self()});
- _ ->
- ok
- end.
-
-controlling_process(_Socket, _Pid) -> ok.
-
-custom_receiver({http_bind, FsmRef, _IP}) ->
- {receiver, ?MODULE, FsmRef}.
-
-become_controller(FsmRef, C2SPid) ->
- gen_fsm:send_all_state_event(FsmRef,
- {become_controller, C2SPid}).
-
-reset_stream({http_bind, _FsmRef, _IP}) ->
- ok.
-
-change_shaper({http_bind, FsmRef, _IP}, Shaper) ->
- gen_fsm:send_all_state_event(FsmRef,
- {change_shaper, Shaper}).
-
-monitor({http_bind, FsmRef, _IP}) ->
- erlang:monitor(process, FsmRef).
-
-close({http_bind, FsmRef, _IP}) ->
- catch gen_fsm:sync_send_all_state_event(FsmRef,
- {stop, close}).
-
-sockname(_Socket) -> {ok, ?NULL_PEER}.
-
-peername({http_bind, _FsmRef, IP}) -> {ok, IP}.
-
-
-%% Entry point for data coming from client through ejabberd HTTP server:
-process_request(Data, IP, HOpts) ->
- Opts1 = ejabberd_c2s_config:get_c2s_limits(),
- Opts = [{xml_socket, true} | Opts1],
- MaxStanzaSize = case lists:keysearch(max_stanza_size, 1,
- Opts)
- of
- {value, {_, Size}} -> Size;
- _ -> infinity
- end,
- PayloadSize = iolist_size(Data),
- case catch parse_request(Data, PayloadSize,
- MaxStanzaSize)
- of
- %% No existing session:
- {ok, {<<"">>, Rid, Attrs, Payload}} ->
- case fxml:get_attr_s(<<"to">>, Attrs) of
- <<"">> ->
- ?DEBUG("Session not created (Improper addressing)", []),
- {200, ?HEADER,
- <<"<body type='terminate' condition='improper-ad"
- "dressing' xmlns='",
- (?NS_HTTP_BIND)/binary, "'/>">>};
- XmppDomain ->
- NXmppDomain = jid:nameprep(XmppDomain),
- Sid = p1_sha:sha(term_to_binary({p1_time_compat:monotonic_time(), make_ref()})),
- case start(NXmppDomain, Sid, <<"">>, IP, HOpts) of
- {error, _} ->
- {500, ?HEADER,
- <<"<body type='terminate' condition='internal-se"
- "rver-error' xmlns='",
- (?NS_HTTP_BIND)/binary,
- "'>Internal Server Error</body>">>};
- {ok, Pid} ->
- handle_session_start(Pid, NXmppDomain, Sid, Rid, Attrs,
- Payload, PayloadSize, IP)
- end
- end;
- %% Existing session
- {ok, {Sid, Rid, Attrs, Payload1}} ->
- StreamStart = case fxml:get_attr_s(<<"xmpp:restart">>,
- Attrs)
- of
- <<"true">> -> true;
- _ -> false
- end,
- Payload2 = case fxml:get_attr_s(<<"type">>, Attrs) of
- <<"terminate">> ->
- Payload1 ++ [{xmlstreamend, <<"stream:stream">>}];
- _ -> Payload1
- end,
- handle_http_put(Sid, Rid, Attrs, Payload2, PayloadSize,
- StreamStart, IP);
- {size_limit, Sid} ->
- case mnesia:dirty_read({http_bind, Sid}) of
- {error, _} -> {404, ?HEADER, <<"">>};
- {ok, #http_bind{pid = FsmRef}} ->
- gen_fsm:sync_send_all_state_event(FsmRef,
- {stop, close}),
- {200, ?HEADER,
- <<"<body type='terminate' condition='undefined-c"
- "ondition' xmlns='",
- (?NS_HTTP_BIND)/binary, "'>Request Too Large</body>">>}
- end;
- _ ->
- ?DEBUG("Received bad request: ~p", [Data]),
- {400, ?HEADER, <<"">>}
- end.
-
-handle_session_start(Pid, XmppDomain, Sid, Rid, Attrs,
- Payload, PayloadSize, IP) ->
- ?DEBUG("got pid: ~p", [Pid]),
- Wait = case str:to_integer(fxml:get_attr_s(<<"wait">>,
- Attrs))
- of
- {error, _} -> ?MAX_WAIT;
- {CWait, _} ->
- if CWait > (?MAX_WAIT) -> ?MAX_WAIT;
- true -> CWait
- end
- end,
- Hold = case str:to_integer(fxml:get_attr_s(<<"hold">>,
- Attrs))
- of
- {error, _} -> (?MAX_REQUESTS) - 1;
- {CHold, _} ->
- if CHold > (?MAX_REQUESTS) - 1 -> (?MAX_REQUESTS) - 1;
- true -> CHold
- end
- end,
- Pdelay = case
- str:to_integer(fxml:get_attr_s(<<"process-delay">>,
- Attrs))
- of
- {error, _} -> ?PROCESS_DELAY_DEFAULT;
- {CPdelay, _}
- when ((?PROCESS_DELAY_MIN) =< CPdelay) and
- (CPdelay =< (?PROCESS_DELAY_MAX)) ->
- CPdelay;
- {CPdelay, _} ->
- lists:max([lists:min([CPdelay, ?PROCESS_DELAY_MAX]),
- ?PROCESS_DELAY_MIN])
- end,
- Version = case catch
- list_to_float(binary_to_list(fxml:get_attr_s(<<"ver">>, Attrs)))
- of
- {'EXIT', _} -> 0.0;
- V -> V
- end,
- XmppVersion = fxml:get_attr_s(<<"xmpp:version">>, Attrs),
- ?DEBUG("Create session: ~p", [Sid]),
- mnesia:dirty_write(
- #http_bind{id = Sid,
- pid = Pid,
- to = {XmppDomain,
- XmppVersion},
- hold = Hold,
- wait = Wait,
- process_delay = Pdelay,
- version = Version
- }),
- handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize, true, IP).
-
-%%%----------------------------------------------------------------------
-%%% Callback functions from gen_fsm
-%%%----------------------------------------------------------------------
-
-init([Sid, Key, IP, HOpts]) ->
- ?DEBUG("started: ~p", [{Sid, Key, IP}]),
- Opts1 = ejabberd_c2s_config:get_c2s_limits(),
- SOpts = lists:filtermap(fun({stream_management, _}) -> true;
- ({max_ack_queue, _}) -> true;
- ({resume_timeout, _}) -> true;
- ({max_resume_timeout, _}) -> true;
- ({resend_on_timeout, _}) -> true;
- (_) -> false
- end, HOpts),
-
- Opts = [{xml_socket, true} | SOpts ++ Opts1],
- Shaper = none,
- ShaperState = shaper:new(Shaper),
- Socket = {http_bind, self(), IP},
- ejabberd_socket:start(ejabberd_c2s, ?MODULE, Socket, Opts),
- Timer = erlang:start_timer(?MAX_INACTIVITY, self(), []),
- {ok, loop, #state{id = Sid,
- key = Key,
- socket = Socket,
- shaper_state = ShaperState,
- max_inactivity = ?MAX_INACTIVITY,
- max_pause = ?MAX_PAUSE,
- timer = Timer}}.
-
-handle_event({become_controller, C2SPid}, StateName, StateData) ->
- case StateData#state.input of
- cancel ->
- {next_state, StateName,
- StateData#state{waiting_input = C2SPid}};
- Input ->
- lists:foreach(fun (Event) -> C2SPid ! Event end,
- queue:to_list(Input)),
- {next_state, StateName,
- StateData#state{input = queue:new(),
- waiting_input = C2SPid}}
- end;
-handle_event({change_shaper, Shaper}, StateName,
- StateData) ->
- NewShaperState = shaper:new(Shaper),
- {next_state, StateName,
- StateData#state{shaper_state = NewShaperState}};
-handle_event(_Event, StateName, StateData) ->
- {next_state, StateName, StateData}.
-
-handle_sync_event({send_xml, Packet}, _From, StateName,
- #state{http_receiver = undefined} = StateData) ->
- Output = [Packet | StateData#state.output],
- Reply = ok,
- {reply, Reply, StateName,
- StateData#state{output = Output}};
-handle_sync_event({send_xml, Packet}, _From, StateName,
- #state{out_of_order_receiver = true} = StateData) ->
- Output = [Packet | StateData#state.output],
- Reply = ok,
- {reply, Reply, StateName,
- StateData#state{output = Output}};
-handle_sync_event({send_xml, Packet}, _From, StateName,
- StateData) ->
- Output = [Packet | StateData#state.output],
- cancel_timer(StateData#state.timer),
- Timer = set_inactivity_timer(StateData#state.pause,
- StateData#state.max_inactivity),
- HTTPReply = {ok, Output},
- gen_fsm:reply(StateData#state.http_receiver, HTTPReply),
- cancel_timer(StateData#state.wait_timer),
- Rid = StateData#state.rid,
- ReqList = [#hbr{rid = Rid, key = StateData#state.key,
- out = Output}
- | [El
- || El <- StateData#state.req_list, El#hbr.rid /= Rid]],
- Reply = ok,
- {reply, Reply, StateName,
- StateData#state{output = [], http_receiver = undefined,
- req_list = ReqList, wait_timer = undefined,
- timer = Timer}};
-
-handle_sync_event({stop,close}, _From, _StateName, StateData) ->
- Reply = ok,
- {stop, normal, Reply, StateData};
-handle_sync_event({stop,stream_closed}, _From, _StateName, StateData) ->
- Reply = ok,
- {stop, normal, Reply, StateData};
-handle_sync_event({stop,Reason}, _From, _StateName, StateData) ->
- ?DEBUG("Closing bind session ~p - Reason: ~p", [StateData#state.id, Reason]),
- Reply = ok,
- {stop, normal, Reply, StateData};
-%% HTTP PUT: Receive packets from the client
-handle_sync_event(#http_put{rid = Rid}, _From,
- StateName, StateData)
- when StateData#state.shaper_timer /= undefined ->
- Pause = case
- erlang:read_timer(StateData#state.shaper_timer)
- of
- false -> 0;
- P -> P
- end,
- Reply = {wait, Pause},
- ?DEBUG("Shaper timer for RID ~p: ~p", [Rid, Reply]),
- {reply, Reply, StateName, StateData};
-handle_sync_event(#http_put{payload_size =
- PayloadSize} =
- Request,
- _From, StateName, StateData) ->
- ?DEBUG("New request: ~p", [Request]),
- {NewShaperState, NewShaperTimer} =
- update_shaper(StateData#state.shaper_state,
- PayloadSize),
- handle_http_put_event(Request, StateName,
- StateData#state{shaper_state = NewShaperState,
- shaper_timer = NewShaperTimer});
-%% HTTP GET: send packets to the client
-handle_sync_event({http_get, Rid, Wait, Hold}, From, StateName, StateData) ->
- TNow = p1_time_compat:system_time(micro_seconds),
- if (Hold > 0) and
- ((StateData#state.output == []) or (StateData#state.rid < Rid)) and
- ((TNow - StateData#state.ctime) < (Wait*1000*1000)) and
- (StateData#state.rid =< Rid) and
- (StateData#state.pause == 0) ->
- send_receiver_reply(StateData#state.http_receiver, {ok, empty}),
- cancel_timer(StateData#state.wait_timer),
- WaitTimer = erlang:start_timer(Wait * 1000, self(), []),
- cancel_timer(StateData#state.timer),
- {next_state, StateName, StateData#state{
- http_receiver = From,
- out_of_order_receiver = StateData#state.rid < Rid,
- wait_timer = WaitTimer,
- timer = undefined}};
- true ->
- cancel_timer(StateData#state.timer),
- Reply = {ok, StateData#state.output},
- ReqList = [#hbr{rid = Rid,
- key = StateData#state.key,
- out = StateData#state.output}
- | [El
- || El <- StateData#state.req_list,
- El#hbr.rid /= Rid]],
- if (StateData#state.http_receiver /= undefined) and
- StateData#state.out_of_order_receiver ->
- {reply, Reply, StateName,
- StateData#state{output = [], timer = undefined,
- req_list = ReqList,
- out_of_order_receiver = false}};
- true ->
- send_receiver_reply(StateData#state.http_receiver, {ok, empty}),
- cancel_timer(StateData#state.wait_timer),
- Timer = set_inactivity_timer(StateData#state.pause,
- StateData#state.max_inactivity),
- {reply, Reply, StateName,
- StateData#state{output = [],
- http_receiver = undefined,
- wait_timer = undefined,
- timer = Timer,
- req_list = ReqList}}
- end
- end;
-handle_sync_event(peername, _From, StateName,
- StateData) ->
- Reply = {ok, StateData#state.ip},
- {reply, Reply, StateName, StateData};
-handle_sync_event(_Event, _From, StateName,
- StateData) ->
- Reply = ok, {reply, Reply, StateName, StateData}.
-
-code_change(_OldVsn, StateName, StateData, _Extra) ->
- {ok, StateName, StateData}.
-
-handle_info({timeout, Timer, _}, _StateName,
- #state{id = SID, timer = Timer} = StateData) ->
- ?INFO_MSG("Session timeout. Closing the HTTP bind "
- "session: ~p",
- [SID]),
- {stop, normal, StateData};
-handle_info({timeout, WaitTimer, _}, StateName,
- #state{wait_timer = WaitTimer} = StateData) ->
- if StateData#state.http_receiver /= undefined ->
- cancel_timer(StateData#state.timer),
- Timer = set_inactivity_timer(StateData#state.pause,
- StateData#state.max_inactivity),
- gen_fsm:reply(StateData#state.http_receiver,
- {ok, empty}),
- Rid = StateData#state.rid,
- ReqList = [#hbr{rid = Rid, key = StateData#state.key,
- out = []}
- | [El
- || El <- StateData#state.req_list, El#hbr.rid /= Rid]],
- {next_state, StateName,
- StateData#state{http_receiver = undefined,
- req_list = ReqList, wait_timer = undefined,
- timer = Timer}};
- true -> {next_state, StateName, StateData}
- end;
-handle_info({timeout, ShaperTimer, _}, StateName,
- #state{shaper_timer = ShaperTimer} = StateData) ->
- {next_state, StateName, StateData#state{shaper_timer = undefined}};
-
-handle_info(_, StateName, StateData) ->
- {next_state, StateName, StateData}.
-
-terminate(_Reason, _StateName, StateData) ->
- ?DEBUG("terminate: Deleting session ~s",
- [StateData#state.id]),
- mnesia:dirty_delete({http_bind, StateData#state.id}),
- send_receiver_reply(StateData#state.http_receiver,
- {ok, terminate}),
- case StateData#state.waiting_input of
- false -> ok;
- C2SPid -> gen_fsm:send_event(C2SPid, closed)
- end,
- ok.
-
-%%%----------------------------------------------------------------------
-%%% Internal functions
-%%%----------------------------------------------------------------------
-
-%% PUT / Get processing:
-handle_http_put_event(#http_put{rid = Rid,
- attrs = Attrs, hold = Hold} =
- Request,
- StateName, StateData) ->
- ?DEBUG("New request: ~p", [Request]),
- RidAllow = rid_allow(StateData#state.rid, Rid, Attrs,
- Hold, StateData#state.max_pause),
- case RidAllow of
- buffer ->
- ?DEBUG("Buffered request: ~p", [Request]),
- PendingRequests = StateData#state.unprocessed_req_list,
- Requests = lists:keydelete(Rid, 2, PendingRequests),
- ReqList = [#hbr{rid = Rid, key = StateData#state.key,
- out = []}
- | [El
- || El <- StateData#state.req_list,
- El#hbr.rid > Rid - 1 - Hold]],
- ?DEBUG("reqlist: ~p", [ReqList]),
- UnprocessedReqList = [Request | Requests],
- cancel_timer(StateData#state.timer),
- Timer = set_inactivity_timer(0,
- StateData#state.max_inactivity),
- {reply, ok, StateName,
- StateData#state{unprocessed_req_list =
- UnprocessedReqList,
- req_list = ReqList, timer = Timer},
- hibernate};
- _ ->
- process_http_put(Request, StateName, StateData,
- RidAllow)
- end.
-
-process_http_put(#http_put{rid = Rid, attrs = Attrs,
- payload = Payload, hold = Hold, stream = StreamTo,
- ip = IP} =
- Request,
- StateName, StateData, RidAllow) ->
- ?DEBUG("Actually processing request: ~p", [Request]),
- Key = fxml:get_attr_s(<<"key">>, Attrs),
- NewKey = fxml:get_attr_s(<<"newkey">>, Attrs),
- KeyAllow = case RidAllow of
- repeat -> true;
- false -> false;
- {true, _} ->
- case StateData#state.key of
- <<"">> -> true;
- OldKey ->
- NextKey = p1_sha:sha(Key),
- ?DEBUG("Key/OldKey/NextKey: ~s/~s/~s",
- [Key, OldKey, NextKey]),
- if OldKey == NextKey -> true;
- true -> ?DEBUG("wrong key: ~s", [Key]), false
- end
- end
- end,
- TNow = p1_time_compat:system_time(micro_seconds),
- LastPoll = if Payload == [] -> TNow;
- true -> 0
- end,
- if (Payload == []) and (Hold == 0) and
- (TNow - StateData#state.last_poll < (?MIN_POLLING)) ->
- Reply = {error, polling_too_frequently},
- {reply, Reply, StateName, StateData};
- KeyAllow ->
- case RidAllow of
- false ->
- Reply = {error, not_exists},
- {reply, Reply, StateName, StateData};
- repeat ->
- ?DEBUG("REPEATING ~p", [Rid]),
- case [El#hbr.out
- || El <- StateData#state.req_list, El#hbr.rid == Rid]
- of
- [] -> {error, not_exists};
- [Out | _XS] ->
- if (Rid == StateData#state.rid) and
- (StateData#state.http_receiver /= undefined) ->
- {reply, ok, StateName, StateData};
- true ->
- Reply = {repeat, lists:reverse(Out)},
- {reply, Reply, StateName,
- StateData#state{last_poll = LastPoll}}
- end
- end;
- {true, Pause} ->
- SaveKey = if NewKey == <<"">> -> Key;
- true -> NewKey
- end,
- ?DEBUG(" -- SaveKey: ~s~n", [SaveKey]),
- ReqList1 = [El
- || El <- StateData#state.req_list,
- El#hbr.rid > Rid - 1 - Hold],
- ReqList = case lists:keymember(Rid, #hbr.rid, ReqList1)
- of
- true -> ReqList1;
- false ->
- [#hbr{rid = Rid, key = StateData#state.key,
- out = []}
- | ReqList1]
- end,
- ?DEBUG("reqlist: ~p", [ReqList]),
- cancel_timer(StateData#state.timer),
- Timer = set_inactivity_timer(Pause,
- StateData#state.max_inactivity),
- case StateData#state.waiting_input of
- false ->
- Input = lists:foldl(fun queue:in/2,
- StateData#state.input, Payload),
- Reply = ok,
- process_buffered_request(Reply, StateName,
- StateData#state{input = Input,
- rid = Rid,
- key = SaveKey,
- ctime = TNow,
- timer = Timer,
- pause = Pause,
- last_poll =
- LastPoll,
- req_list =
- ReqList,
- ip = IP});
- C2SPid ->
- case StreamTo of
- {To, <<"">>} ->
- gen_fsm:send_event(C2SPid,
- {xmlstreamstart,
- <<"stream:stream">>,
- [{<<"to">>, To},
- {<<"xmlns">>, ?NS_CLIENT},
- {<<"xmlns:stream">>,
- ?NS_STREAM}]});
- {To, Version} ->
- gen_fsm:send_event(C2SPid,
- {xmlstreamstart,
- <<"stream:stream">>,
- [{<<"to">>, To},
- {<<"xmlns">>, ?NS_CLIENT},
- {<<"version">>, Version},
- {<<"xmlns:stream">>,
- ?NS_STREAM}]});
- _ -> ok
- end,
- MaxInactivity = get_max_inactivity(StreamTo,
- StateData#state.max_inactivity),
- MaxPause = get_max_inactivity(StreamTo,
- StateData#state.max_pause),
- ?DEBUG("really sending now: ~p", [Payload]),
- lists:foreach(fun ({xmlstreamend, End}) ->
- gen_fsm:send_event(C2SPid,
- {xmlstreamend,
- End});
- (El) ->
- gen_fsm:send_event(C2SPid,
- {xmlstreamelement,
- El})
- end,
- Payload),
- Reply = ok,
- process_buffered_request(Reply, StateName,
- StateData#state{input =
- queue:new(),
- rid = Rid,
- key = SaveKey,
- ctime = TNow,
- timer = Timer,
- pause = Pause,
- last_poll =
- LastPoll,
- req_list =
- ReqList,
- max_inactivity =
- MaxInactivity,
- max_pause =
- MaxPause,
- ip = IP})
- end
- end;
- true ->
- Reply = {error, bad_key},
- {reply, Reply, StateName, StateData}
- end.
-
-process_buffered_request(Reply, StateName, StateData) ->
- Rid = StateData#state.rid,
- Requests = StateData#state.unprocessed_req_list,
- case lists:keysearch(Rid + 1, 2, Requests) of
- {value, Request} ->
- ?DEBUG("Processing buffered request: ~p", [Request]),
- NewRequests = lists:keydelete(Rid + 1, 2, Requests),
- handle_http_put_event(Request, StateName,
- StateData#state{unprocessed_req_list =
- NewRequests});
- _ -> {reply, Reply, StateName, StateData, hibernate}
- end.
-
-handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize,
- StreamStart, IP) ->
- case http_put(Sid, Rid, Attrs, Payload, PayloadSize,
- StreamStart, IP)
- of
- {error, not_exists} ->
- ?DEBUG("no session associated with sid: ~p", [Sid]),
- {404, ?HEADER, <<"">>};
- {{error, Reason}, Sess} ->
- ?DEBUG("Error on HTTP put. Reason: ~p", [Reason]),
- handle_http_put_error(Reason, Sess);
- {{repeat, OutPacket}, Sess} ->
- ?DEBUG("http_put said 'repeat!' ...~nOutPacket: ~p",
- [OutPacket]),
- send_outpacket(Sess, OutPacket);
- {{wait, Pause}, _Sess} ->
- ?DEBUG("Trafic Shaper: Delaying request ~p", [Rid]),
- timer:sleep(Pause),
- handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize,
- StreamStart, IP);
- {ok, Sess} ->
- prepare_response(Sess, Rid, [], StreamStart)
- end.
-
-http_put(Sid, Rid, Attrs, Payload, PayloadSize,
- StreamStart, IP) ->
- ?DEBUG("Looking for session: ~p", [Sid]),
- case mnesia:dirty_read({http_bind, Sid}) of
- [] ->
- {error, not_exists};
- [#http_bind{pid = FsmRef, hold=Hold,
- to= {To, StreamVersion}} = Sess] ->
- NewStream = case StreamStart of
- true -> {To, StreamVersion};
- _ -> <<"">>
- end,
- {gen_fsm:sync_send_all_state_event(
- FsmRef, #http_put{rid = Rid,
- attrs = Attrs,
- payload = Payload,
- payload_size = PayloadSize,
- hold = Hold,
- stream = NewStream,
- ip = IP},
- 30000), Sess}
- end.
-
-handle_http_put_error(Reason,
- #http_bind{pid = FsmRef, version = Version})
- when Version >= 0 ->
- gen_fsm:sync_send_all_state_event(FsmRef,
- {stop, {put_error, Reason}}),
- case Reason of
- not_exists ->
- {200, ?HEADER,
- fxml:element_to_binary(#xmlel{name = <<"body">>,
- attrs =
- [{<<"xmlns">>, ?NS_HTTP_BIND},
- {<<"type">>, <<"terminate">>},
- {<<"condition">>,
- <<"item-not-found">>}],
- children = []})};
- bad_key ->
- {200, ?HEADER,
- fxml:element_to_binary(#xmlel{name = <<"body">>,
- attrs =
- [{<<"xmlns">>, ?NS_HTTP_BIND},
- {<<"type">>, <<"terminate">>},
- {<<"condition">>,
- <<"item-not-found">>}],
- children = []})};
- polling_too_frequently ->
- {200, ?HEADER,
- fxml:element_to_binary(#xmlel{name = <<"body">>,
- attrs =
- [{<<"xmlns">>, ?NS_HTTP_BIND},
- {<<"type">>, <<"terminate">>},
- {<<"condition">>,
- <<"policy-violation">>}],
- children = []})}
- end;
-handle_http_put_error(Reason,
- #http_bind{pid = FsmRef}) ->
- gen_fsm:sync_send_all_state_event(FsmRef,
- {stop, {put_error_no_version, Reason}}),
- case Reason of
- not_exists -> %% bad rid
- ?DEBUG("Closing HTTP bind session (Bad rid).", []),
- {404, ?HEADER, <<"">>};
- bad_key ->
- ?DEBUG("Closing HTTP bind session (Bad key).", []),
- {404, ?HEADER, <<"">>};
- polling_too_frequently ->
- ?DEBUG("Closing HTTP bind session (User polling "
- "too frequently).",
- []),
- {403, ?HEADER, <<"">>}
- end.
-
-%% Control RID ordering
-rid_allow(none, _NewRid, _Attrs, _Hold, _MaxPause) ->
- {true, 0};
-rid_allow(OldRid, NewRid, Attrs, Hold, MaxPause) ->
- ?DEBUG("Previous rid / New rid: ~p/~p",
- [OldRid, NewRid]),
- if
- %% We did not miss any packet, we can process it immediately:
- NewRid == OldRid + 1 ->
- case catch
- jlib:binary_to_integer(fxml:get_attr_s(<<"pause">>,
- Attrs))
- of
- {'EXIT', _} -> {true, 0};
- Pause1 when Pause1 =< MaxPause ->
- ?DEBUG("got pause: ~p", [Pause1]), {true, Pause1};
- _ -> {true, 0}
- end;
- %% We have missed packets, we need to cached it to process it later on:
- (OldRid < NewRid) and (NewRid =< OldRid + Hold + 1) ->
- buffer;
- (NewRid =< OldRid) and (NewRid > OldRid - Hold - 1) ->
- repeat;
- true -> false
- end.
-
-update_shaper(ShaperState, PayloadSize) ->
- {NewShaperState, Pause} = shaper:update(ShaperState,
- PayloadSize),
- if Pause > 0 ->
- ShaperTimer = erlang:start_timer(Pause, self(),
- activate),
- {NewShaperState, ShaperTimer};
- true -> {NewShaperState, undefined}
- end.
-
-prepare_response(Sess, Rid, OutputEls, StreamStart) ->
- receive after Sess#http_bind.process_delay -> ok end,
- case catch http_get(Sess, Rid) of
- {ok, cancel} ->
- {200, ?HEADER,
- <<"<body type='error' xmlns='", (?NS_HTTP_BIND)/binary,
- "'/>">>};
- {ok, empty} ->
- {200, ?HEADER,
- <<"<body xmlns='", (?NS_HTTP_BIND)/binary, "'/>">>};
- {ok, terminate} ->
- {200, ?HEADER,
- <<"<body type='terminate' xmlns='",
- (?NS_HTTP_BIND)/binary, "'/>">>};
- {ok, ROutPacket} ->
- OutPacket = lists:reverse(ROutPacket),
- ?DEBUG("OutPacket: ~p", [OutputEls ++ OutPacket]),
- prepare_outpacket_response(Sess, Rid,
- OutputEls ++ OutPacket, StreamStart);
- {'EXIT', {shutdown, _}} ->
- {200, ?HEADER,
- <<"<body type='terminate' condition='system-shut"
- "down' xmlns='",
- (?NS_HTTP_BIND)/binary, "'/>">>};
- {'EXIT', _Reason} ->
- {200, ?HEADER,
- <<"<body type='terminate' xmlns='",
- (?NS_HTTP_BIND)/binary, "'/>">>}
- end.
-
-%% Send output payloads on establised sessions
-prepare_outpacket_response(Sess, _Rid, OutPacket,
- false) ->
- case catch send_outpacket(Sess, OutPacket) of
- {'EXIT', _Reason} ->
- ?DEBUG("Error in sending packet ~p ", [_Reason]),
- {200, ?HEADER,
- <<"<body type='terminate' xmlns='",
- (?NS_HTTP_BIND)/binary, "'/>">>};
- SendRes -> SendRes
- end;
-%% Handle a new session along with its output payload
-prepare_outpacket_response(#http_bind{id = Sid,
- wait = Wait, hold = Hold, to = To} =
- _Sess,
- _Rid, OutPacket, true) ->
- case OutPacket of
- [{xmlstreamstart, _, OutAttrs} | Els] ->
- AuthID = fxml:get_attr_s(<<"id">>, OutAttrs),
- From = fxml:get_attr_s(<<"from">>, OutAttrs),
- Version = fxml:get_attr_s(<<"version">>, OutAttrs),
- OutEls = case Els of
- [] -> [];
- [{xmlstreamelement,
- #xmlel{name = <<"stream:features">>,
- attrs = StreamAttribs, children = StreamEls}}
- | StreamTail] ->
- TypedTail = [check_default_xmlns(OEl)
- || {xmlstreamelement, OEl} <- StreamTail],
- [#xmlel{name = <<"stream:features">>,
- attrs =
- [{<<"xmlns:stream">>, ?NS_STREAM}] ++
- StreamAttribs,
- children = StreamEls}]
- ++ TypedTail;
- StreamTail ->
- [check_default_xmlns(OEl)
- || {xmlstreamelement, OEl} <- StreamTail]
- end,
- case OutEls of
- [#xmlel{name = <<"stream:error">>}] ->
- {200, ?HEADER,
- <<"<body type='terminate' condition='host-unknow"
- "n' xmlns='",
- (?NS_HTTP_BIND)/binary, "'/>">>};
- _ ->
- BOSH_attribs = [{<<"authid">>, AuthID},
- {<<"xmlns:xmpp">>, ?NS_BOSH},
- {<<"xmlns:stream">>, ?NS_STREAM}]
- ++
- case OutEls of
- [] -> [];
- _ -> [{<<"xmpp:version">>, Version}]
- end,
- MaxInactivity = get_max_inactivity(To, ?MAX_INACTIVITY),
- MaxPause = get_max_pause(To),
- {200, ?HEADER,
- fxml:element_to_binary(#xmlel{name = <<"body">>,
- attrs =
- [{<<"xmlns">>, ?NS_HTTP_BIND},
- {<<"sid">>, Sid},
- {<<"wait">>,
- iolist_to_binary(integer_to_list(Wait))},
- {<<"requests">>,
- iolist_to_binary(integer_to_list(Hold
- +
- 1))},
- {<<"inactivity">>,
- iolist_to_binary(integer_to_list(trunc(MaxInactivity
- /
- 1000)))},
- {<<"maxpause">>,
- iolist_to_binary(integer_to_list(MaxPause))},
- {<<"polling">>,
- iolist_to_binary(integer_to_list(trunc((?MIN_POLLING)
- /
- 1000000)))},
- {<<"ver">>, ?BOSH_VERSION},
- {<<"from">>, From},
- {<<"secure">>, <<"true">>}]
- ++ BOSH_attribs,
- children = OutEls})}
- end;
- _ ->
- {200, ?HEADER,
- <<"<body type='terminate' condition='internal-se"
- "rver-error' xmlns='",
- (?NS_HTTP_BIND)/binary, "'/>">>}
- end.
-
-http_get(#http_bind{pid = FsmRef, wait = Wait,
- hold = Hold},
- Rid) ->
- gen_fsm:sync_send_all_state_event(FsmRef,
- {http_get, Rid, Wait, Hold},
- 2 * (?MAX_WAIT) * 1000).
-
-send_outpacket(#http_bind{pid = FsmRef}, OutPacket) ->
- case OutPacket of
- [] ->
- {200, ?HEADER,
- <<"<body xmlns='", (?NS_HTTP_BIND)/binary, "'/>">>};
- [{xmlstreamend, _}] ->
- gen_fsm:sync_send_all_state_event(FsmRef,
- {stop, stream_closed}),
- {200, ?HEADER,
- <<"<body xmlns='", (?NS_HTTP_BIND)/binary, "'/>">>};
- _ ->
- AllElements = lists:all(fun ({xmlstreamelement,
- #xmlel{name = <<"stream:error">>}}) ->
- false;
- ({xmlstreamelement, _}) -> true;
- ({xmlstreamraw, _}) -> true;
- (_) -> false
- end,
- OutPacket),
- case AllElements of
- true ->
- TypedEls = lists:foldl(fun ({xmlstreamelement, El},
- Acc) ->
- Acc ++
- [fxml:element_to_binary(check_default_xmlns(El))];
- ({xmlstreamraw, R}, Acc) ->
- Acc ++ [R]
- end,
- [], OutPacket),
- Body = <<"<body xmlns='", (?NS_HTTP_BIND)/binary, "'>",
- (iolist_to_binary(TypedEls))/binary, "</body>">>,
- ?DEBUG(" --- outgoing data --- ~n~s~n --- END "
- "--- ~n",
- [Body]),
- {200, ?HEADER, Body};
- false ->
- case OutPacket of
- [{xmlstreamstart, _, _} | SEls] ->
- OutEls = case SEls of
- [{xmlstreamelement,
- #xmlel{name = <<"stream:features">>,
- attrs = StreamAttribs,
- children = StreamEls}}
- | StreamTail] ->
- TypedTail = [check_default_xmlns(OEl)
- || {xmlstreamelement, OEl}
- <- StreamTail],
- [#xmlel{name = <<"stream:features">>,
- attrs =
- [{<<"xmlns:stream">>,
- ?NS_STREAM}]
- ++ StreamAttribs,
- children = StreamEls}]
- ++ TypedTail;
- StreamTail ->
- [check_default_xmlns(OEl)
- || {xmlstreamelement, OEl} <- StreamTail]
- end,
- {200, ?HEADER,
- fxml:element_to_binary(#xmlel{name = <<"body">>,
- attrs =
- [{<<"xmlns">>,
- ?NS_HTTP_BIND}],
- children = OutEls})};
- _ ->
- SErrCond = lists:filter(fun ({xmlstreamelement,
- #xmlel{name =
- <<"stream:error">>}}) ->
- true;
- (_) -> false
- end,
- OutPacket),
- StreamErrCond = case SErrCond of
- [] -> null;
- [{xmlstreamelement,
- #xmlel{} = StreamErrorTag}
- | _] ->
- [StreamErrorTag]
- end,
- gen_fsm:sync_send_all_state_event(FsmRef,
- {stop,
- {stream_error,
- OutPacket}}),
- case StreamErrCond of
- null ->
- {200, ?HEADER,
- <<"<body type='terminate' condition='internal-se"
- "rver-error' xmlns='",
- (?NS_HTTP_BIND)/binary, "'/>">>};
- _ ->
- {200, ?HEADER,
- <<"<body type='terminate' condition='remote-stre"
- "am-error' xmlns='",
- (?NS_HTTP_BIND)/binary, "' ", "xmlns:stream='",
- (?NS_STREAM)/binary, "'>",
- (elements_to_string(StreamErrCond))/binary,
- "</body>">>}
- end
- end
- end
- end.
-
-parse_request(Data, PayloadSize, MaxStanzaSize) ->
- ?DEBUG("--- incoming data --- ~n~s~n --- END "
- "--- ",
- [Data]),
- case fxml_stream:parse_element(Data) of
- #xmlel{name = <<"body">>, attrs = Attrs,
- children = Els} ->
- Xmlns = fxml:get_attr_s(<<"xmlns">>, Attrs),
- if Xmlns /= (?NS_HTTP_BIND) -> {error, bad_request};
- true ->
- case catch
- jlib:binary_to_integer(fxml:get_attr_s(<<"rid">>,
- Attrs))
- of
- {'EXIT', _} -> {error, bad_request};
- Rid ->
- FixedEls = lists:filter(fun (I) ->
- case I of
- #xmlel{} -> true;
- _ -> false
- end
- end,
- Els),
- Sid = fxml:get_attr_s(<<"sid">>, Attrs),
- if PayloadSize =< MaxStanzaSize ->
- {ok, {Sid, Rid, Attrs, FixedEls}};
- true -> {size_limit, Sid}
- end
- end
- end;
- #xmlel{} -> {error, bad_request};
- {error, _Reason} -> {error, bad_request}
- end.
-
-send_receiver_reply(undefined, _Reply) -> ok;
-send_receiver_reply(Receiver, Reply) ->
- gen_fsm:reply(Receiver, Reply).
-
-%% Cancel timer and empty message queue.
-cancel_timer(undefined) -> ok;
-cancel_timer(Timer) ->
- erlang:cancel_timer(Timer),
- receive {timeout, Timer, _} -> ok after 0 -> ok end.
-
-%% If client asked for a pause (pause > 0), we apply the pause value
-%% as inactivity timer:
-set_inactivity_timer(Pause, _MaxInactivity)
- when Pause > 0 ->
- erlang:start_timer(Pause * 1000, self(), []);
-%% Otherwise, we apply the max_inactivity value as inactivity timer:
-set_inactivity_timer(_Pause, MaxInactivity) ->
- erlang:start_timer(MaxInactivity, self(), []).
-
-elements_to_string([]) -> [];
-elements_to_string([El | Els]) ->
- [fxml:element_to_binary(El) | elements_to_string(Els)].
-
-%% @spec (To, Default::integer()) -> integer()
-%% where To = [] | {Host::string(), Version::string()}
-get_max_inactivity({Host, _}, Default) ->
- case gen_mod:get_module_opt(Host, mod_http_bind, max_inactivity,
- fun(I) when is_integer(I), I>0 -> I end,
- undefined)
- of
- Seconds when is_integer(Seconds) -> Seconds * 1000;
- undefined -> Default
- end;
-get_max_inactivity(_, Default) -> Default.
-
-get_max_pause({Host, _}) ->
- gen_mod:get_module_opt(Host, mod_http_bind, max_pause,
- fun(I) when is_integer(I), I>0 -> I end,
- ?MAX_PAUSE);
-get_max_pause(_) -> ?MAX_PAUSE.
-
-check_default_xmlns(#xmlel{name = Name, attrs = Attrs,
- children = Els} =
- El) ->
- case fxml:get_tag_attr_s(<<"xmlns">>, El) of
- <<"">> ->
- #xmlel{name = Name,
- attrs = [{<<"xmlns">>, ?NS_CLIENT} | Attrs],
- children = Els};
- _ -> El
- end;
-check_default_xmlns(El) -> El.
-
-%% Check that mod_http_bind has been defined in config file.
-%% Print a warning in log file if this is not the case.
-check_bind_module(XmppDomain) ->
- case gen_mod:is_loaded(XmppDomain, mod_http_bind) of
- true -> true;
- false ->
- ?ERROR_MSG("You are trying to use BOSH (HTTP Bind) "
- "in host ~p, but the module mod_http_bind "
- "is not started in that host. Configure "
- "your BOSH client to connect to the correct "
- "host, or add your desired host to the "
- "configuration, or check your 'modules' "
- "section in your ejabberd configuration "
- "file.",
- [XmppDomain]),
- false
- end.