aboutsummaryrefslogtreecommitdiff
path: root/src/ejabberd_http_bindjson.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/ejabberd_http_bindjson.erl')
-rw-r--r--src/ejabberd_http_bindjson.erl1229
1 files changed, 1229 insertions, 0 deletions
diff --git a/src/ejabberd_http_bindjson.erl b/src/ejabberd_http_bindjson.erl
new file mode 100644
index 000000000..75af8fdb4
--- /dev/null
+++ b/src/ejabberd_http_bindjson.erl
@@ -0,0 +1,1229 @@
+%%%----------------------------------------------------------------------
+%%% File : ejabberd_http_bindjson.erl
+%%% Original Bind Author : Stefan Strigler <steve@zeank.in-berlin.de>
+%%% Purpose : Implements XMPP over BOSH (XEP-0205) with a JSON Transport
+%%% Created : 23 Sep 2010 by Eric Cestari <ecestari@process-one.net>
+%%% Modified: may 2009 by Mickael Remond, Alexey Schepin
+%%% Id : $Id: ejabberd_http_bind.erl 953 2009-05-07 10:40:40Z alexey $
+%%%----------------------------------------------------------------------
+
+-module(ejabberd_http_bindjson).
+
+-behaviour(gen_fsm).
+
+%% External exports
+-export([start_link/3, init/1, handle_event/3,
+ handle_sync_event/4, code_change/4, handle_info/3,
+ terminate/3, send/2, send_xml/2, sockname/1, peername/1,
+ setopts/2, controlling_process/2, become_controller/2,
+ change_controller/2, custom_receiver/1, reset_stream/1,
+ change_shaper/2, monitor/1, close/1, start/4,
+ handle_session_start/8, handle_http_put/7, http_put/7,
+ http_get/2, prepare_response/4, process_request/2]).
+
+-include("ejabberd.hrl").
+-include("logger.hrl").
+
+-include("jlib.hrl").
+
+-include("ejabberd_http.hrl").
+
+-include("http_bind.hrl").
+
+-record(http_bind,
+{
+ id,
+ pid,
+ to,
+ hold,
+ wait,
+ process_delay,
+ version
+}).
+
+-define(NULL_PEER, {{0, 0, 0, 0}, 0}).
+
+-record(hbr, {rid, key, out}).
+
+-record(state,
+{
+ id,
+ rid = none,
+ key,
+ socket,
+ output = [],
+ input = queue:new(),
+ waiting_input = false,
+ shaper_state,
+ shaper_timer,
+ last_receiver,
+ last_poll,
+ http_receiver,
+ wait_timer,
+ ctime = 0,
+ timer,
+ pause = 0,
+ unprocessed_req_list = [],
+ req_list = [],
+ max_inactivity,
+ max_pause,
+ ip = ?NULL_PEER
+}).
+
+-record(http_put,
+{
+ rid,
+ attrs,
+ payload,
+ payload_size,
+ hold, stream,
+ ip
+}).
+
+%%-define(DBGFSM, true).
+-ifdef(DBGFSM).
+
+-define(FSMOPTS, [{debug, [trace]}]).
+
+-else.
+
+-define(FSMOPTS, []).
+
+-endif.
+
+-define(BOSH_VERSION, <<"1.8">>).
+
+-define(NS_CLIENT, <<"jabber:client">>).
+
+-define(NS_BOSH, <<"urn:xmpp:xbosh">>).
+
+-define(NS_HTTP_BIND,
+ <<"http://jabber.org/protocol/httpbind">>).
+
+-define(MAX_REQUESTS, 2).
+
+-define(MIN_POLLING, 2000000).
+
+-define(MAX_WAIT, 3600).
+
+-define(MAX_INACTIVITY, 30000).
+
+-define(MAX_PAUSE, 120).
+
+-define(PROCESS_DELAY_DEFAULT, 100).
+
+-define(PROCESS_DELAY_MIN, 0).
+
+-define(PROCESS_DELAY_MAX, 1000).
+
+start(XMPPDomain, Sid, Key, IP) ->
+ ?DEBUG("Starting session", []),
+ case catch
+ supervisor:start_child(ejabberd_http_bind_sup,
+ [Sid, Key, IP])
+ of
+ {ok, Pid} -> {ok, Pid};
+ {error, _} = Err ->
+ case check_bind_module(XMPPDomain) of
+ false -> {error, <<"Cannot start HTTP bind session">>};
+ true ->
+ ?ERROR_MSG("Cannot start HTTP bind session: ~p", [Err]),
+ Err
+ end;
+ Exit ->
+ ?ERROR_MSG("Cannot start HTTP bind session: ~p",
+ [Exit]),
+ {error, Exit}
+ end.
+
+start_link(Sid, Key, IP) ->
+ gen_fsm:start_link(?MODULE, [Sid, Key, IP], ?FSMOPTS).
+
+send({http_bind, FsmRef, _IP}, Packet) ->
+ gen_fsm:sync_send_all_state_event(FsmRef,
+ {send, Packet}).
+
+send_xml({http_bind, FsmRef, _IP}, Packet) ->
+ gen_fsm:sync_send_all_state_event(FsmRef,
+ {send_xml, Packet}).
+
+setopts({http_bind, FsmRef, _IP}, Opts) ->
+ case lists:member({active, once}, Opts) of
+ true ->
+ gen_fsm:send_all_state_event(FsmRef,
+ {activate, self()});
+ _ ->
+ case lists:member({active, false}, Opts) of
+ true ->
+ gen_fsm:sync_send_all_state_event(FsmRef,
+ deactivate_socket);
+ _ -> ok
+ end
+ end.
+
+controlling_process(_Socket, _Pid) -> ok.
+
+custom_receiver({http_bind, FsmRef, _IP}) ->
+ {receiver, ?MODULE, FsmRef}.
+
+become_controller(FsmRef, C2SPid) ->
+ gen_fsm:send_all_state_event(FsmRef,
+ {become_controller, C2SPid}).
+
+change_controller({http_bind, FsmRef, _IP}, C2SPid) ->
+ become_controller(FsmRef, C2SPid).
+
+reset_stream({http_bind, _FsmRef, _IP}) -> ok.
+
+change_shaper({http_bind, FsmRef, _IP}, Shaper) ->
+ gen_fsm:send_all_state_event(FsmRef,
+ {change_shaper, Shaper}).
+
+monitor({http_bind, FsmRef, _IP}) ->
+ erlang:monitor(process, FsmRef).
+
+close({http_bind, FsmRef, _IP}) ->
+ catch gen_fsm:sync_send_all_state_event(FsmRef,
+ {stop, close}).
+
+sockname(_Socket) -> {ok, ?NULL_PEER}.
+
+peername({http_bind, _FsmRef, IP}) -> {ok, IP}.
+
+process_request(Data, IP) ->
+ Opts1 = ejabberd_c2s_config:get_c2s_limits(),
+ Opts = [{xml_socket, true} | Opts1],
+ MaxStanzaSize = case lists:keysearch(max_stanza_size, 1,
+ Opts)
+ of
+ {value, {_, Size}} -> Size;
+ _ -> infinity
+ end,
+ PayloadSize = iolist_size(Data),
+ case catch parse_request(Data, PayloadSize,
+ MaxStanzaSize)
+ of
+ %% No existing session:
+ {ok, {<<"">>, Rid, Attrs, Payload}} ->
+ case xml:get_attr_s(<<"to">>, Attrs) of
+ <<"">> ->
+ ?DEBUG("Session not created (Improper addressing)", []),
+ {200, ?HEADER,
+ <<"{\"body\":{\"type\":\"terminate\" \"condition\""
+ ":\"improper-addressing\", \"xmlns\":\"",
+ (?NS_HTTP_BIND)/binary, "\"}}">>};
+ XmppDomain ->
+ Sid = make_sid(),
+ case start(XmppDomain, Sid, <<"">>, IP) of
+ {error, _} ->
+ {500, ?HEADER,
+ <<"{\"body\":{\"type\":\"terminate\" \"condition\""
+ ":\"internal-server-error\", \"xmlns\":\"",
+ (?NS_HTTP_BIND)/binary,
+ "\",\"$\":\"Internal Server Error\"}}">>};
+ {ok, Pid} ->
+ handle_session_start(Pid, XmppDomain, Sid, Rid, Attrs,
+ Payload, PayloadSize, IP)
+ end
+ end;
+ %% Existing session
+ {ok, {Sid, Rid, Attrs, Payload1}} ->
+ StreamStart = case xml:get_attr_s(<<"xmpp:restart">>,
+ Attrs)
+ of
+ <<"true">> -> true;
+ _ -> false
+ end,
+ Payload2 = case xml:get_attr_s(<<"type">>, Attrs) of
+ <<"terminate">> ->
+ Payload1 ++ [{xmlstreamend, <<"stream:stream">>}];
+ _ -> Payload1
+ end,
+ handle_http_put(Sid, Rid, Attrs, Payload2, PayloadSize,
+ StreamStart, IP);
+ {size_limit, Sid} ->
+ case get_session(Sid) of
+ {error, _} -> {404, ?HEADER, <<"">>};
+ {ok, #http_bind{pid = FsmRef}} ->
+ gen_fsm:sync_send_all_state_event(FsmRef,
+ {stop, close}),
+ {200, ?HEADER,
+ <<"{\"body\": {\"type\"=\"terminate\" \"conditio"
+ "n\":\"undefined-condition\" \"xmlns\":\"",
+ (?NS_HTTP_BIND)/binary,
+ "\", \"$\":\"Request Too Large\"}}">>}
+ end;
+ _ ->
+ ?DEBUG("Received bad request: ~p", [Data]),
+ {400, ?HEADER, <<"">>}
+ end.
+
+handle_session_start(Pid, XmppDomain, Sid, Rid, Attrs,
+ Payload, PayloadSize, IP) ->
+ ?DEBUG("got pid: ~p", [Pid]),
+ Wait = case str:to_integer(xml:get_attr_s(<<"wait">>,
+ Attrs))
+ of
+ {error, _} -> ?MAX_WAIT;
+ {CWait, _} ->
+ if CWait > (?MAX_WAIT) -> ?MAX_WAIT;
+ true -> CWait
+ end
+ end,
+ Hold = case str:to_integer(xml:get_attr_s(<<"hold">>,
+ Attrs))
+ of
+ {error, _} -> (?MAX_REQUESTS) - 1;
+ {CHold, _} ->
+ if CHold > (?MAX_REQUESTS) - 1 -> (?MAX_REQUESTS) - 1;
+ true -> CHold
+ end
+ end,
+ Pdelay = case
+ str:to_integer(xml:get_attr_s(<<"process-delay">>,
+ Attrs))
+ of
+ {error, _} -> ?PROCESS_DELAY_DEFAULT;
+ {CPdelay, _}
+ when ((?PROCESS_DELAY_MIN) =< CPdelay) and
+ (CPdelay =< (?PROCESS_DELAY_MAX)) ->
+ CPdelay;
+ {CPdelay, _} ->
+ erlang:max(erlang:min(CPdelay, ?PROCESS_DELAY_MAX),
+ ?PROCESS_DELAY_MIN)
+ end,
+ Version = case catch
+ list_to_float(binary_to_list(xml:get_attr_s(<<"ver">>, Attrs)))
+ of
+ {'EXIT', _} -> 0.0;
+ V -> V
+ end,
+ XmppVersion = xml:get_attr_s(<<"xmpp:version">>, Attrs),
+ ?DEBUG("Create session: ~p", [Sid]),
+ mnesia:async_dirty(fun () ->
+ mnesia:write(#http_bind{id = Sid, pid = Pid,
+ to =
+ {XmppDomain,
+ XmppVersion},
+ hold = Hold, wait = Wait,
+ process_delay = Pdelay,
+ version = Version})
+ end),
+ handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize,
+ true, IP).
+
+%%%----------------------------------------------------------------------
+%%% Callback functions from gen_fsm
+%%%----------------------------------------------------------------------
+
+init([Sid, Key, IP]) ->
+ ?DEBUG("started: ~p", [{Sid, Key, IP}]),
+ Opts1 = ejabberd_c2s_config:get_c2s_limits(),
+ Opts = [{xml_socket, true} | Opts1],
+ Shaper = none,
+ ShaperState = shaper:new(Shaper),
+ Socket = {http_bind, self(), IP},
+ ejabberd_socket:start(ejabberd_c2s, ?MODULE, Socket,
+ Opts),
+ Timer = erlang:start_timer(?MAX_INACTIVITY, self(), []),
+ {ok, loop,
+ #state{id = Sid, key = Key, socket = Socket,
+ shaper_state = ShaperState,
+ max_inactivity = ?MAX_INACTIVITY,
+ max_pause = ?MAX_PAUSE, timer = Timer}}.
+
+handle_event({become_controller, C2SPid}, StateName,
+ StateData) ->
+ erlang:monitor(process, C2SPid),
+ case StateData#state.input of
+ cancel ->
+ {next_state, StateName,
+ StateData#state{waiting_input = C2SPid}};
+ Input ->
+ lists:foreach(fun (Event) -> C2SPid ! Event end,
+ queue:to_list(Input)),
+ {next_state, StateName,
+ StateData#state{input = queue:new(),
+ waiting_input = C2SPid}}
+ end;
+handle_event({change_shaper, Shaper}, StateName,
+ StateData) ->
+ NewShaperState = shaper:new(Shaper),
+ {next_state, StateName,
+ StateData#state{shaper_state = NewShaperState}};
+handle_event(_Event, StateName, StateData) ->
+ {next_state, StateName, StateData}.
+
+handle_sync_event({send_xml, Packet}, _From, StateName,
+ #state{http_receiver = undefined} = StateData) ->
+ Output = [Packet | StateData#state.output],
+ Reply = ok,
+ {reply, Reply, StateName,
+ StateData#state{output = Output}};
+handle_sync_event({send_xml, Packet}, _From, StateName,
+ StateData) ->
+ Output = [Packet | StateData#state.output],
+ cancel_timer(StateData#state.timer),
+ Timer = set_inactivity_timer(StateData#state.pause,
+ StateData#state.max_inactivity),
+ HTTPReply = {ok, Output},
+ gen_fsm:reply(StateData#state.http_receiver, HTTPReply),
+ cancel_timer(StateData#state.wait_timer),
+ Rid = StateData#state.rid,
+ ReqList = [#hbr{rid = Rid, key = StateData#state.key,
+ out = Output}
+ | [El
+ || El <- StateData#state.req_list, El#hbr.rid /= Rid]],
+ Reply = ok,
+ {reply, Reply, StateName,
+ StateData#state{output = [], http_receiver = undefined,
+ req_list = ReqList, wait_timer = undefined,
+ timer = Timer}};
+handle_sync_event({stop, close}, _From, _StateName,
+ StateData) ->
+ Reply = ok, {stop, normal, Reply, StateData};
+handle_sync_event({stop, stream_closed}, _From,
+ _StateName, StateData) ->
+ Reply = ok, {stop, normal, Reply, StateData};
+handle_sync_event(deactivate_socket, _From, StateName,
+ StateData) ->
+ {reply, ok, StateName,
+ StateData#state{waiting_input = false}};
+handle_sync_event({stop, Reason}, _From, _StateName,
+ StateData) ->
+ ?DEBUG("Closing bind session ~p - Reason: ~p",
+ [StateData#state.id, Reason]),
+ Reply = ok,
+ {stop, normal, Reply, StateData};
+%% HTTP PUT: Receive packets from the client
+handle_sync_event(#http_put{rid = Rid}, _From,
+ StateName, StateData)
+ when StateData#state.shaper_timer /= undefined ->
+ Pause = case
+ erlang:read_timer(StateData#state.shaper_timer)
+ of
+ false -> 0;
+ P -> P
+ end,
+ Reply = {wait, Pause},
+ ?DEBUG("Shaper timer for RID ~p: ~p", [Rid, Reply]),
+ {reply, Reply, StateName, StateData};
+handle_sync_event(#http_put{payload_size =
+ PayloadSize} =
+ Request,
+ _From, StateName, StateData) ->
+ ?DEBUG("New request: ~p", [Request]),
+ {NewShaperState, NewShaperTimer} =
+ update_shaper(StateData#state.shaper_state,
+ PayloadSize),
+ handle_http_put_event(Request, StateName,
+ StateData#state{shaper_state = NewShaperState,
+ shaper_timer = NewShaperTimer});
+%% HTTP GET: send packets to the client
+handle_sync_event({http_get, Rid, Wait, Hold}, From,
+ StateName, StateData) ->
+ send_receiver_reply(StateData#state.http_receiver,
+ {ok, empty}),
+ cancel_timer(StateData#state.wait_timer),
+ TNow = tnow(),
+ if (Hold > 0) and (StateData#state.output == []) and
+ (TNow - StateData#state.ctime < Wait * 1000 * 1000)
+ and (StateData#state.rid == Rid)
+ and (StateData#state.input /= cancel)
+ and (StateData#state.pause == 0) ->
+ WaitTimer = erlang:start_timer(Wait * 1000, self(), []),
+ cancel_timer(StateData#state.timer),
+ {next_state, StateName,
+ StateData#state{http_receiver = From,
+ wait_timer = WaitTimer, timer = undefined}};
+ StateData#state.input == cancel ->
+ cancel_timer(StateData#state.timer),
+ Timer = set_inactivity_timer(StateData#state.pause,
+ StateData#state.max_inactivity),
+ Reply = {ok, cancel},
+ {reply, Reply, StateName,
+ StateData#state{input = queue:new(),
+ http_receiver = undefined, wait_timer = undefined,
+ timer = Timer}};
+ true ->
+ cancel_timer(StateData#state.timer),
+ Timer = set_inactivity_timer(StateData#state.pause,
+ StateData#state.max_inactivity),
+ Reply = {ok, StateData#state.output},
+ ReqList = [#hbr{rid = Rid, key = StateData#state.key,
+ out = StateData#state.output}
+ | [El
+ || El <- StateData#state.req_list, El#hbr.rid /= Rid]],
+ {reply, Reply, StateName,
+ StateData#state{output = [], http_receiver = undefined,
+ wait_timer = undefined, timer = Timer,
+ req_list = ReqList}}
+ end;
+handle_sync_event(peername, _From, StateName,
+ StateData) ->
+ Reply = {ok, StateData#state.ip},
+ {reply, Reply, StateName, StateData};
+handle_sync_event(_Event, _From, StateName,
+ StateData) ->
+ Reply = ok, {reply, Reply, StateName, StateData}.
+
+code_change(_OldVsn, StateName, StateData, _Extra) ->
+ {ok, StateName, StateData}.
+
+handle_info({timeout, Timer, _}, _StateName,
+ #state{id = SID, timer = Timer} = StateData) ->
+ ?INFO_MSG("Session timeout. Closing the HTTP bind "
+ "session: ~p",
+ [SID]),
+ {stop, normal, StateData};
+handle_info({timeout, WaitTimer, _}, StateName,
+ #state{wait_timer = WaitTimer} = StateData) ->
+ if StateData#state.http_receiver /= undefined ->
+ cancel_timer(StateData#state.timer),
+ Timer = set_inactivity_timer(StateData#state.pause,
+ StateData#state.max_inactivity),
+ gen_fsm:reply(StateData#state.http_receiver,
+ {ok, empty}),
+ Rid = StateData#state.rid,
+ ReqList = [#hbr{rid = Rid, key = StateData#state.key,
+ out = []}
+ | [El
+ || El <- StateData#state.req_list, El#hbr.rid /= Rid]],
+ {next_state, StateName,
+ StateData#state{http_receiver = undefined,
+ req_list = ReqList, wait_timer = undefined,
+ timer = Timer}};
+ true -> {next_state, StateName, StateData}
+ end;
+handle_info({timeout, ShaperTimer, _}, StateName,
+ #state{shaper_timer = ShaperTimer} = StateData) ->
+ {next_state, StateName,
+ StateData#state{shaper_timer = undefined}};
+handle_info({'DOWN', _MRef, process, C2SPid, _},
+ _StateName,
+ #state{waiting_input = C2SPid} = StateData) ->
+ {stop, normal, StateData};
+handle_info(_, StateName, StateData) ->
+ {next_state, StateName, StateData}.
+
+terminate(_Reason, _StateName, StateData) ->
+ ?DEBUG("terminate: Deleting session ~s",
+ [StateData#state.id]),
+ mnesia:dirty_delete({http_bind, StateData#state.id}),
+ send_receiver_reply(StateData#state.http_receiver,
+ {ok, terminate}),
+ case StateData#state.waiting_input of
+ false -> ok;
+ C2SPid -> gen_fsm:send_event(C2SPid, closed)
+ end,
+ ok.
+
+%%%----------------------------------------------------------------------
+%%% Internal functions
+%%%----------------------------------------------------------------------
+
+handle_http_put_event(#http_put{rid = Rid,
+ attrs = Attrs, hold = Hold} =
+ Request,
+ StateName, StateData) ->
+ ?DEBUG("New request: ~p", [Request]),
+ RidAllow = rid_allow(StateData#state.rid, Rid, Attrs,
+ Hold, StateData#state.max_pause),
+ case RidAllow of
+ buffer ->
+ ?DEBUG("Buffered request: ~p", [Request]),
+ PendingRequests = StateData#state.unprocessed_req_list,
+ Requests = lists:keydelete(Rid, 2, PendingRequests),
+ ReqList = [#hbr{rid = Rid, key = StateData#state.key,
+ out = []}
+ | [El
+ || El <- StateData#state.req_list,
+ El#hbr.rid > Rid - 1 - Hold]],
+ ?DEBUG("reqlist: ~p", [ReqList]),
+ UnprocessedReqList = [Request | Requests],
+ cancel_timer(StateData#state.timer),
+ Timer = set_inactivity_timer(0,
+ StateData#state.max_inactivity),
+ {reply, buffered, StateName,
+ StateData#state{unprocessed_req_list =
+ UnprocessedReqList,
+ req_list = ReqList, timer = Timer}};
+ _ ->
+ process_http_put(Request, StateName, StateData,
+ RidAllow)
+ end.
+
+process_http_put(#http_put{rid = Rid, attrs = Attrs,
+ payload = Payload, hold = Hold, stream = StreamTo,
+ ip = IP} =
+ Request,
+ StateName, StateData, RidAllow) ->
+ ?DEBUG("Actually processing request: ~p", [Request]),
+ Key = xml:get_attr_s(<<"key">>, Attrs),
+ NewKey = xml:get_attr_s(<<"newkey">>, Attrs),
+ KeyAllow = case RidAllow of
+ repeat -> true;
+ false -> false;
+ {true, _} ->
+ case StateData#state.key of
+ <<"">> -> true;
+ OldKey ->
+ NextKey = sha:sha(Key),
+ ?DEBUG("Key/OldKey/NextKey: ~s/~s/~s",
+ [Key, OldKey, NextKey]),
+ if OldKey == NextKey -> true;
+ true -> ?DEBUG("wrong key: ~s", [Key]), false
+ end
+ end
+ end,
+ TNow = tnow(),
+ LastPoll = if Payload == [] -> TNow;
+ true -> 0
+ end,
+ if (Payload == []) and (Hold == 0) and
+ (TNow - StateData#state.last_poll < (?MIN_POLLING)) ->
+ Reply = {error, polling_too_frequently},
+ {reply, Reply, StateName, StateData};
+ KeyAllow ->
+ case RidAllow of
+ false ->
+ Reply = {error, not_exists},
+ {reply, Reply, StateName, StateData};
+ repeat ->
+ ?DEBUG("REPEATING ~p", [Rid]),
+ Reply = case [El#hbr.out
+ || El <- StateData#state.req_list,
+ El#hbr.rid == Rid]
+ of
+ [] -> {error, not_exists};
+ [Out | _XS] -> {repeat, lists:reverse(Out)}
+ end,
+ {reply, Reply, StateName,
+ StateData#state{input = cancel, last_poll = LastPoll}};
+ {true, Pause} ->
+ SaveKey = if NewKey == <<"">> -> Key;
+ true -> NewKey
+ end,
+ ?DEBUG(" -- SaveKey: ~s~n", [SaveKey]),
+ ReqList1 = [El
+ || El <- StateData#state.req_list,
+ El#hbr.rid > Rid - 1 - Hold],
+ ReqList = case lists:keymember(Rid, #hbr.rid, ReqList1)
+ of
+ true -> ReqList1;
+ false ->
+ [#hbr{rid = Rid, key = StateData#state.key,
+ out = []}
+ | ReqList1]
+ end,
+ ?DEBUG("reqlist: ~p", [ReqList]),
+ cancel_timer(StateData#state.timer),
+ Timer = set_inactivity_timer(Pause,
+ StateData#state.max_inactivity),
+ case StateData#state.waiting_input of
+ false ->
+ Input = lists:foldl(fun queue:in/2,
+ StateData#state.input, Payload),
+ Reply = ok,
+ process_buffered_request(Reply, StateName,
+ StateData#state{input = Input,
+ rid = Rid,
+ key = SaveKey,
+ ctime = TNow,
+ timer = Timer,
+ pause = Pause,
+ last_poll =
+ LastPoll,
+ req_list =
+ ReqList,
+ ip = IP});
+ C2SPid ->
+ case StreamTo of
+ {To, <<"">>} ->
+ gen_fsm:send_event(C2SPid,
+ {xmlstreamstart,
+ <<"stream:stream">>,
+ [{<<"to">>, To},
+ {<<"xmlns">>, ?NS_CLIENT},
+ {<<"xmlns:stream">>,
+ ?NS_STREAM}]});
+ {To, Version} ->
+ gen_fsm:send_event(C2SPid,
+ {xmlstreamstart,
+ <<"stream:stream">>,
+ [{<<"to">>, To},
+ {<<"xmlns">>, ?NS_CLIENT},
+ {<<"version">>, Version},
+ {<<"xmlns:stream">>,
+ ?NS_STREAM}]});
+ _ -> ok
+ end,
+ MaxInactivity = get_max_inactivity(StreamTo,
+ StateData#state.max_inactivity),
+ MaxPause = get_max_inactivity(StreamTo,
+ StateData#state.max_pause),
+ ?DEBUG("really sending now: ~p", [Payload]),
+ lists:foreach(fun ({xmlstreamend, End}) ->
+ gen_fsm:send_event(C2SPid,
+ {xmlstreamend,
+ End});
+ (El) ->
+ gen_fsm:send_event(C2SPid,
+ {xmlstreamelement,
+ El})
+ end,
+ Payload),
+ Reply = ok,
+ process_buffered_request(Reply, StateName,
+ StateData#state{input =
+ queue:new(),
+ rid = Rid,
+ key = SaveKey,
+ ctime = TNow,
+ timer = Timer,
+ pause = Pause,
+ last_poll =
+ LastPoll,
+ req_list =
+ ReqList,
+ max_inactivity =
+ MaxInactivity,
+ max_pause =
+ MaxPause,
+ ip = IP})
+ end
+ end;
+ true ->
+ Reply = {error, bad_key},
+ {reply, Reply, StateName, StateData}
+ end.
+
+process_buffered_request(Reply, StateName, StateData) ->
+ Rid = StateData#state.rid,
+ Requests = StateData#state.unprocessed_req_list,
+ case lists:keysearch(Rid + 1, 2, Requests) of
+ {value, Request} ->
+ ?DEBUG("Processing buffered request: ~p", [Request]),
+ NewRequests = lists:keydelete(Rid + 1, 2, Requests),
+ handle_http_put_event(Request, StateName,
+ StateData#state{unprocessed_req_list =
+ NewRequests});
+ _ -> {reply, Reply, StateName, StateData, hibernate}
+ end.
+
+handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize,
+ StreamStart, IP) ->
+ case http_put(Sid, Rid, Attrs, Payload, PayloadSize,
+ StreamStart, IP)
+ of
+ {error, not_exists} ->
+ ?DEBUG("no session associated with sid: ~p", [Sid]),
+ {404, ?HEADER, <<"">>};
+ {{error, Reason}, Sess} ->
+ ?DEBUG("Error on HTTP put. Reason: ~p", [Reason]),
+ handle_http_put_error(Reason, Sess);
+ {{repeat, OutPacket}, Sess} ->
+ ?DEBUG("http_put said \"repeat!\" ...~nOutPacket: ~p",
+ [OutPacket]),
+ send_outpacket(Sess, OutPacket);
+ {{wait, Pause}, _Sess} ->
+ ?DEBUG("Trafic Shaper: Delaying request ~p", [Rid]),
+ timer:sleep(Pause),
+ handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize,
+ StreamStart, IP);
+ {buffered, _Sess} ->
+ {200, ?HEADER,
+ <<"{\"body\":{ \"xmlns\":\"", (?NS_HTTP_BIND)/binary,
+ "\"}}">>};
+ {ok, Sess} ->
+ prepare_response(Sess, Rid, [], StreamStart)
+ end.
+
+http_put(Sid, Rid, Attrs, Payload, PayloadSize,
+ StreamStart, IP) ->
+ ?DEBUG("Looking for session: ~p", [Sid]),
+ case get_session(Sid) of
+ {error, _} -> {error, not_exists};
+ {ok,
+ #http_bind{pid = FsmRef, hold = Hold,
+ to = {To, StreamVersion}} =
+ Sess} ->
+ NewStream = case StreamStart of
+ true -> {To, StreamVersion};
+ _ -> <<"">>
+ end,
+ {gen_fsm:sync_send_all_state_event(FsmRef,
+ #http_put{rid = Rid, attrs = Attrs,
+ payload = Payload,
+ payload_size =
+ PayloadSize,
+ hold = Hold,
+ stream = NewStream,
+ ip = IP},
+ 30000),
+ Sess}
+ end.
+
+handle_http_put_error(Reason,
+ #http_bind{pid = FsmRef, version = Version})
+ when Version >= 0 ->
+ gen_fsm:sync_send_all_state_event(FsmRef,
+ {stop, {put_error, Reason}}),
+ case Reason of
+ not_exists ->
+ {200, ?HEADER,
+ jiffy:encode(xmpp_json:to_json(#xmlel{name =
+ <<"body">>,
+ attrs =
+ [{<<"xmlns">>,
+ ?NS_HTTP_BIND},
+ {<<"type">>,
+ <<"terminate">>},
+ {<<"condition">>,
+ <<"item-not-found">>}],
+ children = []}))};
+ bad_key ->
+ {200, ?HEADER,
+ jiffy:encode(xmpp_json:to_json(#xmlel{name =
+ <<"body">>,
+ attrs =
+ [{<<"xmlns">>,
+ ?NS_HTTP_BIND},
+ {<<"type">>,
+ <<"terminate">>},
+ {<<"condition">>,
+ <<"item-not-found">>}],
+ children = []}))};
+ polling_too_frequently ->
+ {200, ?HEADER,
+ jiffy:encode(xmpp_json:to_json(#xmlel{name =
+ <<"body">>,
+ attrs =
+ [{<<"xmlns">>,
+ ?NS_HTTP_BIND},
+ {<<"type">>,
+ <<"terminate">>},
+ {<<"condition">>,
+ <<"policy-violation">>}],
+ children = []}))}
+ end;
+handle_http_put_error(Reason,
+ #http_bind{pid = FsmRef}) ->
+ gen_fsm:sync_send_all_state_event(FsmRef,
+ {stop, {put_error_no_version, Reason}}),
+ case Reason of
+ not_exists -> %% bad rid
+ ?DEBUG("Closing HTTP bind session (Bad rid).", []),
+ {404, ?HEADER, <<"">>};
+ bad_key ->
+ ?DEBUG("Closing HTTP bind session (Bad key).", []),
+ {404, ?HEADER, <<"">>};
+ polling_too_frequently ->
+ ?DEBUG("Closing HTTP bind session (User polling "
+ "too frequently).",
+ []),
+ {403, ?HEADER, <<"">>}
+ end.
+
+rid_allow(none, _NewRid, _Attrs, _Hold, _MaxPause) ->
+ {true, 0};
+rid_allow(OldRid, NewRid, Attrs, Hold, MaxPause) ->
+ ?DEBUG("Previous rid / New rid: ~p/~p",
+ [OldRid, NewRid]),
+ if
+ %% We did not miss any packet, we can process it immediately:
+ NewRid == OldRid + 1 ->
+ case catch
+ jlib:binary_to_integer(xml:get_attr_s(<<"pause">>,
+ Attrs))
+ of
+ {'EXIT', _} -> {true, 0};
+ Pause1 when Pause1 =< MaxPause ->
+ ?DEBUG("got pause: ~p", [Pause1]), {true, Pause1};
+ _ -> {true, 0}
+ end;
+ %% We have missed packets, we need to cached it to process it later on:
+ (OldRid < NewRid) and (NewRid =< OldRid + Hold + 1) ->
+ buffer;
+ (NewRid =< OldRid) and (NewRid > OldRid - Hold - 1) ->
+ repeat;
+ true -> false
+ end.
+
+update_shaper(ShaperState, PayloadSize) ->
+ {NewShaperState, Pause} = shaper:update(ShaperState,
+ PayloadSize),
+ if Pause > 0 ->
+ ShaperTimer = erlang:start_timer(Pause, self(),
+ activate),
+ {NewShaperState, ShaperTimer};
+ true -> {NewShaperState, undefined}
+ end.
+
+prepare_response(Sess, Rid, OutputEls, StreamStart) ->
+ receive after Sess#http_bind.process_delay -> ok end,
+ case catch http_get(Sess, Rid) of
+ {ok, cancel} ->
+ {200, ?HEADER,
+ <<"{\"body\": {\"type\":\"error\", \"xmlns\":\"",
+ (?NS_HTTP_BIND)/binary, "\"/>">>};
+ {ok, empty} ->
+ {200, ?HEADER,
+ <<"{\"body\":{ \"xmlns\":\"", (?NS_HTTP_BIND)/binary,
+ "\"}}">>};
+ {ok, terminate} ->
+ {200, ?HEADER,
+ <<"{\"body\": {\"type\":\"terminate\", "
+ "\"xmlns\":\"",
+ (?NS_HTTP_BIND)/binary, "\"/>">>};
+ {ok, ROutPacket} ->
+ OutPacket = lists:reverse(ROutPacket),
+ ?DEBUG("OutPacket: ~p", [OutputEls ++ OutPacket]),
+ prepare_outpacket_response(Sess, Rid,
+ OutputEls ++ OutPacket, StreamStart);
+ {'EXIT', {shutdown, _}} ->
+ {200, ?HEADER,
+ <<"{\"body\": {\"type\":\"terminate\",\"conditio"
+ "n\":\"system-shutdown\", \"xmlns\":\"",
+ (?NS_HTTP_BIND)/binary, "\"}}">>};
+ {'EXIT', _Reason} ->
+ {200, ?HEADER,
+ <<"{\"body\": {\"type\":\"terminate, \"xmlns\":\"",
+ (?NS_HTTP_BIND)/binary, "\"}}">>}
+ end.
+
+prepare_outpacket_response(Sess, _Rid, OutPacket,
+ false) ->
+ case catch send_outpacket(Sess, OutPacket) of
+ {'EXIT', _Reason} ->
+ {200, ?HEADER,
+ <<"{\"body\": {\"type\":\"terminate\", "
+ "\"xmlns\":\"",
+ (?NS_HTTP_BIND)/binary, "\"}}">>};
+ SendRes -> SendRes
+ end;
+%% Handle a new session along with its output payload
+prepare_outpacket_response(#http_bind{id = Sid,
+ wait = Wait, hold = Hold, to = To} =
+ Sess,
+ Rid, OutPacket, true) ->
+ case OutPacket of
+ [{xmlstreamstart, _, OutAttrs} | Els] ->
+ AuthID = xml:get_attr_s(<<"id">>, OutAttrs),
+ From = xml:get_attr_s(<<"from">>, OutAttrs),
+ Version = xml:get_attr_s(<<"version">>, OutAttrs),
+ OutEls = case Els of
+ [] -> [];
+ [{xmlstreamelement,
+ #xmlel{name = <<"stream:features">>,
+ attrs = StreamAttribs, children = StreamEls}}
+ | StreamTail] ->
+ TypedTail = [check_default_xmlns(OEl)
+ || {xmlstreamelement, OEl} <- StreamTail],
+ [#xmlel{name = <<"stream:features">>,
+ attrs =
+ [{<<"xmlns:stream">>, ?NS_STREAM}] ++
+ StreamAttribs,
+ children = StreamEls}]
+ ++ TypedTail;
+ StreamTail ->
+ [check_default_xmlns(OEl)
+ || {xmlstreamelement, OEl} <- StreamTail]
+ end,
+ case OutEls of
+ [] -> prepare_response(Sess, Rid, OutPacket, true);
+ [#xmlel{name = <<"stream:error">>}] ->
+ {200, ?HEADER,
+ <<"{\"body\" : {\"type\":\"terminate\", "
+ "\"condition\":\"host-unknown\", \"xmlns\"=\"",
+ (?NS_HTTP_BIND)/binary, "\"}}">>};
+ _ ->
+ BOSH_attribs = [{<<"authid">>, AuthID},
+ {<<"xmlns:xmpp">>, ?NS_BOSH},
+ {<<"xmlns:stream">>, ?NS_STREAM},
+ {<<"xmpp:version">>, Version}],
+% ++
+% case OutEls of
+% [] -> [];
+% _ -> [{<<"xmpp:version">>, Version}]
+% end,
+ MaxInactivity = get_max_inactivity(To, ?MAX_INACTIVITY),
+ MaxPause = get_max_pause(To),
+ {200, ?HEADER,
+ jiffy:encode(xmpp_json:to_json(#xmlel{name =
+ <<"body">>,
+ attrs =
+ [{<<"xmlns">>,
+ ?NS_HTTP_BIND},
+ {<<"sid">>,
+ Sid},
+ {<<"wait">>,
+ iolist_to_binary(integer_to_list(Wait))},
+ {<<"requests">>,
+ iolist_to_binary(integer_to_list(Hold
+ +
+ 1))},
+ {<<"inactivity">>,
+ iolist_to_binary(integer_to_list(trunc(MaxInactivity
+ /
+ 1000)))},
+ {<<"maxpause">>,
+ iolist_to_binary(integer_to_list(MaxPause))},
+ {<<"polling">>,
+ iolist_to_binary(integer_to_list(trunc((?MIN_POLLING)
+ /
+ 1000000)))},
+ {<<"ver">>,
+ ?BOSH_VERSION},
+ {<<"from">>,
+ From},
+ {<<"secure">>,
+ <<"true">>}]
+ ++
+ BOSH_attribs,
+ children =
+ OutEls}))}
+ end;
+ _ ->
+ {200, ?HEADER,
+ <<"{\"body\" : {\"type\":\"terminate\", "
+ "\"condition\":\"internal-server-error\", "
+ "\"xmlns\"=\"",
+ (?NS_HTTP_BIND)/binary, "\"}}">>}
+ end.
+
+http_get(#http_bind{pid = FsmRef, wait = Wait,
+ hold = Hold},
+ Rid) ->
+ gen_fsm:sync_send_all_state_event(FsmRef,
+ {http_get, Rid, Wait, Hold},
+ 2 * (?MAX_WAIT) * 1000).
+
+send_outpacket(#http_bind{pid = FsmRef}, OutPacket) ->
+ case OutPacket of
+ [] ->
+ {200, ?HEADER,
+ <<"{\"body\": {\"xmlns\":\"", (?NS_HTTP_BIND)/binary,
+ "\"}}">>};
+ [{xmlstreamend, _}] ->
+ gen_fsm:sync_send_all_state_event(FsmRef,
+ {stop, stream_closed}),
+ {200, ?HEADER,
+ <<"{\"body\": {\"xmlns\":", (?NS_HTTP_BIND)/binary,
+ "\"}}">>};
+ _ ->
+ AllElements = lists:all(fun ({xmlstreamelement,
+ #xmlel{name = <<"stream:error">>}}) ->
+ false;
+ ({xmlstreamelement, _}) -> true;
+ (_) -> false
+ end,
+ OutPacket),
+ case AllElements of
+ true ->
+ TypedEls = [check_default_xmlns(OEl)
+ || {xmlstreamelement, OEl} <- OutPacket],
+ Body = jiffy:encode(xmpp_json:to_json(#xmlel{name =
+ <<"body">>,
+ attrs =
+ [{<<"xmlns">>,
+ ?NS_HTTP_BIND}],
+ children =
+ TypedEls})),
+ ?DEBUG(" --- outgoing data --- ~n~s~n --- END "
+ "--- ~n",
+ [Body]),
+ {200, ?HEADER, Body};
+ false ->
+ case OutPacket of
+ [{xmlstreamstart, _, _} | SEls] ->
+ OutEls = case SEls of
+ [{xmlstreamelement,
+ #xmlel{name = <<"stream:features">>,
+ attrs = StreamAttribs,
+ children = StreamEls}}
+ | StreamTail] ->
+ TypedTail = [check_default_xmlns(OEl)
+ || {xmlstreamelement, OEl}
+ <- StreamTail],
+ [#xmlel{name = <<"stream:features">>,
+ attrs =
+ [{<<"xmlns:stream">>,
+ ?NS_STREAM}]
+ ++ StreamAttribs,
+ children = StreamEls}]
+ ++ TypedTail;
+ StreamTail ->
+ [check_default_xmlns(OEl)
+ || {xmlstreamelement, OEl} <- StreamTail]
+ end,
+ {200, ?HEADER,
+ jiffy:encode(xmpp_json:to_json(#xmlel{name =
+ <<"body">>,
+ attrs =
+ [{<<"xmlns">>,
+ ?NS_HTTP_BIND}],
+ children =
+ OutEls}))};
+ _ ->
+ SErrCond = lists:filter(fun ({xmlstreamelement,
+ #xmlel{name =
+ <<"stream:error">>}}) ->
+ true;
+ (_) -> false
+ end,
+ OutPacket),
+ StreamErrCond = case SErrCond of
+ [] -> null;
+ [{xmlstreamelement,
+ #xmlel{} = StreamErrorTag}
+ | _] ->
+ [StreamErrorTag]
+ end,
+ gen_fsm:sync_send_all_state_event(FsmRef,
+ {stop,
+ {stream_error,
+ OutPacket}}),
+ case StreamErrCond of
+ null ->
+ {200, ?HEADER,
+ <<"{\"body\" : {\"\"type\"\":\"terminate\", "
+ "\"condition\":\"internal-server-error\", "
+ "\"xmlns\"=\"",
+ (?NS_HTTP_BIND)/binary, "\"}}">>};
+ _ ->
+ {200, ?HEADER,
+ <<"{\"body\" : {\"\"type\"\":\"terminate\", "
+ "\"condition\":\"remote-stream-error\", "
+ "\"xmlns\":\"",
+ (?NS_HTTP_BIND)/binary, "\", ",
+ "\"xmlns:stream\":\"", (?NS_STREAM)/binary,
+ "\" \"$\":",
+ (elements_to_string(StreamErrCond))/binary,
+ "}}">>}
+ end
+ end
+ end
+ end.
+
+parse_request(Data, PayloadSize, MaxStanzaSize) ->
+ ?DEBUG("--- incoming data --- ~n~p~n --- END "
+ "--- ",
+ [xmpp_json:from_json(jiffy:decode(Data))]),
+ case xmpp_json:from_json(jiffy:decode(Data)) of
+ {xmlstreamelement,
+ #xmlel{name = <<"body">>, attrs = Attrs,
+ children = Els}} ->
+ Xmlns = xml:get_attr_s(<<"xmlns">>, Attrs),
+ if Xmlns /= (?NS_HTTP_BIND) -> {error, bad_request};
+ true ->
+ case catch
+ jlib:binary_to_integer(xml:get_attr_s(<<"rid">>,
+ Attrs))
+ of
+ {'EXIT', _} -> {error, bad_request};
+ Rid ->
+ FixedEls = lists:filter(fun (I) ->
+ case I of
+ #xmlel{} -> true;
+ _ -> false
+ end
+ end,
+ Els),
+ Sid = xml:get_attr_s(<<"sid">>, Attrs),
+ if PayloadSize =< MaxStanzaSize ->
+ {ok, {Sid, Rid, Attrs, FixedEls}};
+ true -> {size_limit, Sid}
+ end
+ end
+ end;
+ _ -> {error, bad_request}
+ end.
+
+send_receiver_reply(undefined, _Reply) -> ok;
+send_receiver_reply(Receiver, Reply) ->
+ gen_fsm:reply(Receiver, Reply).
+
+cancel_timer(undefined) -> ok;
+cancel_timer(Timer) ->
+ erlang:cancel_timer(Timer),
+ receive {timeout, Timer, _} -> ok after 0 -> ok end.
+
+set_inactivity_timer(Pause, _MaxInactivity)
+ when Pause > 0 ->
+ erlang:start_timer(Pause * 1000, self(), []);
+%% Otherwise, we apply the max_inactivity value as inactivity timer:
+set_inactivity_timer(_Pause, MaxInactivity) ->
+ erlang:start_timer(MaxInactivity, self(), []).
+
+elements_to_string([]) -> [];
+elements_to_string([El | Els]) ->
+ [jiffy:encode(xmpp_json:to_json(El))
+ | elements_to_string(Els)].
+
+get_max_inactivity({Host, _}, Default) ->
+ case gen_mod:get_module_opt(Host, mod_http_bind, max_inactivity,
+ fun(I) when is_integer(I), I>0 -> I end,
+ undefined)
+ of
+ Seconds when is_integer(Seconds) -> Seconds * 1000;
+ undefined -> Default
+ end;
+get_max_inactivity(_, Default) -> Default.
+
+get_max_pause({Host, _}) ->
+ gen_mod:get_module_opt(Host, mod_http_bind, max_pause,
+ fun(I) when is_integer(I), I>0 -> I end,
+ ?MAX_PAUSE);
+get_max_pause(_) -> ?MAX_PAUSE.
+
+tnow() ->
+ {TMegSec, TSec, TMSec} = now(),
+ (TMegSec * 1000000 + TSec) * 1000000 + TMSec.
+
+check_default_xmlns(#xmlel{name = Name, attrs = Attrs,
+ children = Els} =
+ El) ->
+ case xml:get_tag_attr_s(<<"xmlns">>, El) of
+ <<"">> ->
+ #xmlel{name = Name,
+ attrs = [{<<"xmlns">>, ?NS_CLIENT} | Attrs],
+ children = Els};
+ _ -> El
+ end.
+
+check_bind_module(XmppDomain) ->
+ case gen_mod:is_loaded(XmppDomain, mod_http_bind) of
+ true -> true;
+ false ->
+ ?ERROR_MSG("You are trying to use BOSH (HTTP Bind), "
+ "but the module mod_http_bind is not "
+ "started.~nCheck your 'modules' section "
+ "in your ejabberd configuration file.",
+ []),
+ false
+ end.
+
+make_sid() ->
+ <<(sha:sha(term_to_binary({now(), make_ref()})))/binary,
+ "-", (ejabberd_cluster:node_id())/binary>>.
+
+get_session(SID) ->
+ case str:tokens(SID, <<"-">>) of
+ [_, NodeID] ->
+ case ejabberd_cluster:get_node_by_id(NodeID) of
+ Node when Node == node() ->
+ case mnesia:dirty_read({http_bind, SID}) of
+ [] -> {error, enoent};
+ [Session] -> {ok, Session}
+ end;
+ Node ->
+ case catch rpc:call(Node, mnesia, dirty_read,
+ [{http_bind, SID}], 5000)
+ of
+ [Session] -> {ok, Session};
+ _ -> {error, enoent}
+ end
+ end;
+ _ -> {error, enoent}
+ end.