summaryrefslogtreecommitdiff
path: root/src/web/ejabberd_http_bind.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/web/ejabberd_http_bind.erl')
-rw-r--r--src/web/ejabberd_http_bind.erl1559
1 files changed, 770 insertions, 789 deletions
diff --git a/src/web/ejabberd_http_bind.erl b/src/web/ejabberd_http_bind.erl
index 02b8d27b..91329167 100644
--- a/src/web/ejabberd_http_bind.erl
+++ b/src/web/ejabberd_http_bind.erl
@@ -41,11 +41,15 @@
process_request/2]).
-include("ejabberd.hrl").
+
-include("jlib.hrl").
+
-include("ejabberd_http.hrl").
+
-include("http_bind.hrl").
--record(http_bind, {id, pid, to, hold, wait, process_delay, version}).
+-record(http_bind,
+ {id, pid, to, hold, wait, process_delay, version}).
-define(NULL_PEER, {{0, 0, 0, 0}, 0}).
@@ -89,28 +93,39 @@
%%-define(DBGFSM, true).
-ifdef(DBGFSM).
+
-define(FSMOPTS, [{debug, [trace]}]).
+
-else.
+
-define(FSMOPTS, []).
+
-endif.
--define(BOSH_VERSION, "1.8").
--define(NS_CLIENT, "jabber:client").
--define(NS_BOSH, "urn:xmpp:xbosh").
--define(NS_HTTP_BIND, "http://jabber.org/protocol/httpbind").
+%% Wait 100ms before continue processing, to allow the client provide more related stanzas.
+-define(BOSH_VERSION, <<"1.8">>).
--define(MAX_REQUESTS, 2). % number of simultaneous requests
--define(MIN_POLLING, 2000000). % don't poll faster than that or we will
- % shoot you (time in microsec)
--define(MAX_WAIT, 3600). % max num of secs to keep a request on hold
--define(MAX_INACTIVITY, 30000). % msecs to wait before terminating
- % idle sessions
--define(MAX_PAUSE, 120). % may num of sec a client is allowed to pause
- % the session
+-define(NS_CLIENT, <<"jabber:client">>).
+
+-define(NS_BOSH, <<"urn:xmpp:xbosh">>).
+
+-define(NS_HTTP_BIND,
+ <<"http://jabber.org/protocol/httpbind">>).
+
+-define(MAX_REQUESTS, 2).
+
+-define(MIN_POLLING, 2000000).
+
+-define(MAX_WAIT, 3600).
+
+-define(MAX_INACTIVITY, 30000).
+
+-define(MAX_PAUSE, 120).
-%% Wait 100ms before continue processing, to allow the client provide more related stanzas.
-define(PROCESS_DELAY_DEFAULT, 100).
+
-define(PROCESS_DELAY_MIN, 0).
+
-define(PROCESS_DELAY_MAX, 1000).
%% Line copied from mod_http_bind.erl
@@ -134,10 +149,12 @@ start_link(Sid, Key, IP) ->
gen_fsm:start_link(?MODULE, [Sid, Key, IP], ?FSMOPTS).
send({http_bind, FsmRef, _IP}, Packet) ->
- gen_fsm:sync_send_all_state_event(FsmRef, {send, Packet}).
+ gen_fsm:sync_send_all_state_event(FsmRef,
+ {send, Packet}).
send_xml({http_bind, FsmRef, _IP}, Packet) ->
- gen_fsm:sync_send_all_state_event(FsmRef, {send_xml, Packet}).
+ gen_fsm:sync_send_all_state_event(FsmRef,
+ {send_xml, Packet}).
setopts({http_bind, FsmRef, _IP}, Opts) ->
case lists:member({active, once}, Opts) of
@@ -147,142 +164,143 @@ setopts({http_bind, FsmRef, _IP}, Opts) ->
ok
end.
-controlling_process(_Socket, _Pid) ->
- ok.
+controlling_process(_Socket, _Pid) -> ok.
custom_receiver({http_bind, FsmRef, _IP}) ->
{receiver, ?MODULE, FsmRef}.
become_controller(FsmRef, C2SPid) ->
- gen_fsm:send_all_state_event(FsmRef, {become_controller, C2SPid}).
+ gen_fsm:send_all_state_event(FsmRef,
+ {become_controller, C2SPid}).
reset_stream({http_bind, _FsmRef, _IP}) ->
ok.
change_shaper({http_bind, FsmRef, _IP}, Shaper) ->
- gen_fsm:send_all_state_event(FsmRef, {change_shaper, Shaper}).
+ gen_fsm:send_all_state_event(FsmRef,
+ {change_shaper, Shaper}).
monitor({http_bind, FsmRef, _IP}) ->
erlang:monitor(process, FsmRef).
close({http_bind, FsmRef, _IP}) ->
- catch gen_fsm:sync_send_all_state_event(FsmRef, {stop, close}).
+ catch gen_fsm:sync_send_all_state_event(FsmRef,
+ {stop, close}).
-sockname(_Socket) ->
- {ok, ?NULL_PEER}.
+sockname(_Socket) -> {ok, ?NULL_PEER}.
-peername({http_bind, _FsmRef, IP}) ->
- {ok, IP}.
+peername({http_bind, _FsmRef, IP}) -> {ok, IP}.
%% Entry point for data coming from client through ejabberd HTTP server:
process_request(Data, IP) ->
Opts1 = ejabberd_c2s_config:get_c2s_limits(),
Opts = [{xml_socket, true} | Opts1],
- MaxStanzaSize =
- case lists:keysearch(max_stanza_size, 1, Opts) of
- {value, {_, Size}} -> Size;
- _ -> infinity
- end,
+ MaxStanzaSize = case lists:keysearch(max_stanza_size, 1,
+ Opts)
+ of
+ {value, {_, Size}} -> Size;
+ _ -> infinity
+ end,
PayloadSize = iolist_size(Data),
- case catch parse_request(Data, PayloadSize, MaxStanzaSize) of
- %% No existing session:
- {ok, {"", Rid, Attrs, Payload}} ->
- case xml:get_attr_s("to",Attrs) of
- "" ->
- ?DEBUG("Session not created (Improper addressing)", []),
- {200, ?HEADER, "<body type='terminate' "
- "condition='improper-addressing' "
- "xmlns='" ++ ?NS_HTTP_BIND ++ "'/>"};
- XmppDomain ->
- %% create new session
- Sid = sha:sha(term_to_binary({now(), make_ref()})),
- case start(XmppDomain, Sid, "", IP) of
- {error, _} ->
- {200, ?HEADER, "<body type='terminate' "
- "condition='internal-server-error' "
- "xmlns='" ++ ?NS_HTTP_BIND ++ "'>BOSH module not started</body>"};
- {ok, Pid} ->
- handle_session_start(
- Pid, XmppDomain, Sid, Rid, Attrs,
- Payload, PayloadSize, IP)
- end
- end;
- %% Existing session
- {ok, {Sid, Rid, Attrs, Payload1}} ->
- StreamStart =
- case xml:get_attr_s("xmpp:restart",Attrs) of
- "true" ->
- true;
- _ ->
- false
- end,
- Payload2 = case xml:get_attr_s("type",Attrs) of
- "terminate" ->
- %% close stream
- Payload1 ++ [{xmlstreamend, "stream:stream"}];
- _ ->
- Payload1
- end,
- handle_http_put(Sid, Rid, Attrs, Payload2, PayloadSize,
- StreamStart, IP);
- {size_limit, Sid} ->
- case mnesia:dirty_read({http_bind, Sid}) of
- [] ->
- {404, ?HEADER, ""};
- [#http_bind{pid = FsmRef}] ->
- gen_fsm:sync_send_all_state_event(FsmRef, {stop, close}),
- {200, ?HEADER, "<body type='terminate' "
- "condition='undefined-condition' "
- "xmlns='" ++ ?NS_HTTP_BIND ++ "'>Request Too Large</body>"}
- end;
- _ ->
- ?DEBUG("Received bad request: ~p", [Data]),
- {400, ?HEADER, ""}
+ case catch parse_request(Data, PayloadSize,
+ MaxStanzaSize)
+ of
+ %% No existing session:
+ {ok, {<<"">>, Rid, Attrs, Payload}} ->
+ case xml:get_attr_s(<<"to">>, Attrs) of
+ <<"">> ->
+ ?DEBUG("Session not created (Improper addressing)", []),
+ {200, ?HEADER,
+ <<"<body type='terminate' condition='improper-ad"
+ "dressing' xmlns='",
+ (?NS_HTTP_BIND)/binary, "'/>">>};
+ XmppDomain ->
+ Sid = sha:sha(term_to_binary({now(), make_ref()})),
+ case start(XmppDomain, Sid, <<"">>, IP) of
+ {error, _} ->
+ {500, ?HEADER,
+ <<"<body type='terminate' condition='internal-se"
+ "rver-error' xmlns='",
+ (?NS_HTTP_BIND)/binary,
+ "'>Internal Server Error</body>">>};
+ {ok, Pid} ->
+ handle_session_start(Pid, XmppDomain, Sid, Rid, Attrs,
+ Payload, PayloadSize, IP)
+ end
+ end;
+ %% Existing session
+ {ok, {Sid, Rid, Attrs, Payload1}} ->
+ StreamStart = case xml:get_attr_s(<<"xmpp:restart">>,
+ Attrs)
+ of
+ <<"true">> -> true;
+ _ -> false
+ end,
+ Payload2 = case xml:get_attr_s(<<"type">>, Attrs) of
+ <<"terminate">> ->
+ Payload1 ++ [{xmlstreamend, <<"stream:stream">>}];
+ _ -> Payload1
+ end,
+ handle_http_put(Sid, Rid, Attrs, Payload2, PayloadSize,
+ StreamStart, IP);
+ {size_limit, Sid} ->
+ case mnesia:dirty_read({http_bind, Sid}) of
+ {error, _} -> {404, ?HEADER, <<"">>};
+ {ok, #http_bind{pid = FsmRef}} ->
+ gen_fsm:sync_send_all_state_event(FsmRef,
+ {stop, close}),
+ {200, ?HEADER,
+ <<"<body type='terminate' condition='undefined-c"
+ "ondition' xmlns='",
+ (?NS_HTTP_BIND)/binary, "'>Request Too Large</body>">>}
+ end;
+ _ ->
+ ?DEBUG("Received bad request: ~p", [Data]),
+ {400, ?HEADER, <<"">>}
end.
handle_session_start(Pid, XmppDomain, Sid, Rid, Attrs,
Payload, PayloadSize, IP) ->
?DEBUG("got pid: ~p", [Pid]),
- Wait = case string:to_integer(xml:get_attr_s("wait",Attrs)) of
- {error, _} ->
- ?MAX_WAIT;
- {CWait, _} ->
- if
- (CWait > ?MAX_WAIT) ->
- ?MAX_WAIT;
- true ->
- CWait
- end
+ Wait = case str:to_integer(xml:get_attr_s(<<"wait">>,
+ Attrs))
+ of
+ {error, _} -> ?MAX_WAIT;
+ {CWait, _} ->
+ if CWait > (?MAX_WAIT) -> ?MAX_WAIT;
+ true -> CWait
+ end
end,
- Hold = case string:to_integer(xml:get_attr_s("hold",Attrs)) of
- {error, _} ->
- (?MAX_REQUESTS - 1);
- {CHold, _} ->
- if
- (CHold > (?MAX_REQUESTS - 1)) ->
- (?MAX_REQUESTS - 1);
- true ->
- CHold
- end
+ Hold = case str:to_integer(xml:get_attr_s(<<"hold">>,
+ Attrs))
+ of
+ {error, _} -> (?MAX_REQUESTS) - 1;
+ {CHold, _} ->
+ if CHold > (?MAX_REQUESTS) - 1 -> (?MAX_REQUESTS) - 1;
+ true -> CHold
+ end
end,
- Pdelay = case string:to_integer(xml:get_attr_s("process-delay",Attrs)) of
- {error, _} ->
- ?PROCESS_DELAY_DEFAULT;
- {CPdelay, _} when
- (?PROCESS_DELAY_MIN =< CPdelay) and
- (CPdelay =< ?PROCESS_DELAY_MAX) ->
- CPdelay;
- {CPdelay, _} ->
- lists:max([lists:min([CPdelay, ?PROCESS_DELAY_MAX]), ?PROCESS_DELAY_MIN])
+ Pdelay = case
+ str:to_integer(xml:get_attr_s(<<"process-delay">>,
+ Attrs))
+ of
+ {error, _} -> ?PROCESS_DELAY_DEFAULT;
+ {CPdelay, _}
+ when ((?PROCESS_DELAY_MIN) =< CPdelay) and
+ (CPdelay =< (?PROCESS_DELAY_MAX)) ->
+ CPdelay;
+ {CPdelay, _} ->
+ lists:max([lists:min([CPdelay, ?PROCESS_DELAY_MAX]),
+ ?PROCESS_DELAY_MIN])
end,
- Version =
- case catch list_to_float(
- xml:get_attr_s("ver", Attrs)) of
- {'EXIT', _} -> 0.0;
- V -> V
- end,
- XmppVersion = xml:get_attr_s("xmpp:version", Attrs),
+ Version = case catch
+ list_to_float(binary_to_list(xml:get_attr_s(<<"ver">>, Attrs)))
+ of
+ {'EXIT', _} -> 0.0;
+ V -> V
+ end,
+ XmppVersion = xml:get_attr_s(<<"xmpp:version">>, Attrs),
?DEBUG("Create session: ~p", [Sid]),
mnesia:dirty_write(
#http_bind{id = Sid,
@@ -309,16 +327,8 @@ handle_session_start(Pid, XmppDomain, Sid, Rid, Attrs,
%%----------------------------------------------------------------------
init([Sid, Key, IP]) ->
?DEBUG("started: ~p", [{Sid, Key, IP}]),
-
- %% Read c2s options from the first ejabberd_c2s configuration in
- %% the config file listen section
- %% TODO: We should have different access and shaper values for
- %% each connector. The default behaviour should be however to use
- %% the default c2s restrictions if not defined for the current
- %% connector.
Opts1 = ejabberd_c2s_config:get_c2s_limits(),
Opts = [{xml_socket, true} | Opts1],
-
Shaper = none,
ShaperState = shaper:new(Shaper),
Socket = {http_bind, self(), IP},
@@ -340,22 +350,21 @@ init([Sid, Key, IP]) ->
%%----------------------------------------------------------------------
handle_event({become_controller, C2SPid}, StateName, StateData) ->
case StateData#state.input of
- cancel ->
- {next_state, StateName, StateData#state{
- waiting_input = C2SPid}};
- Input ->
- lists:foreach(
- fun(Event) ->
- C2SPid ! Event
- end, queue:to_list(Input)),
- {next_state, StateName, StateData#state{
- input = queue:new(),
- waiting_input = C2SPid}}
+ cancel ->
+ {next_state, StateName,
+ StateData#state{waiting_input = C2SPid}};
+ Input ->
+ lists:foreach(fun (Event) -> C2SPid ! Event end,
+ queue:to_list(Input)),
+ {next_state, StateName,
+ StateData#state{input = queue:new(),
+ waiting_input = C2SPid}}
end;
-
-handle_event({change_shaper, Shaper}, StateName, StateData) ->
+handle_event({change_shaper, Shaper}, StateName,
+ StateData) ->
NewShaperState = shaper:new(Shaper),
- {next_state, StateName, StateData#state{shaper_state = NewShaperState}};
+ {next_state, StateName,
+ StateData#state{shaper_state = NewShaperState}};
handle_event(_Event, StateName, StateData) ->
{next_state, StateName, StateData}.
@@ -372,13 +381,16 @@ handle_sync_event({send_xml, Packet}, _From, StateName,
#state{http_receiver = undefined} = StateData) ->
Output = [Packet | StateData#state.output],
Reply = ok,
- {reply, Reply, StateName, StateData#state{output = Output}};
+ {reply, Reply, StateName,
+ StateData#state{output = Output}};
handle_sync_event({send_xml, Packet}, _From, StateName,
#state{out_of_order_receiver = true} = StateData) ->
Output = [Packet | StateData#state.output],
Reply = ok,
- {reply, Reply, StateName, StateData#state{output = Output}};
-handle_sync_event({send_xml, Packet}, _From, StateName, StateData) ->
+ {reply, Reply, StateName,
+ StateData#state{output = Output}};
+handle_sync_event({send_xml, Packet}, _From, StateName,
+ StateData) ->
Output = [Packet | StateData#state.output],
cancel_timer(StateData#state.timer),
Timer = set_inactivity_timer(StateData#state.pause,
@@ -387,19 +399,14 @@ handle_sync_event({send_xml, Packet}, _From, StateName, StateData) ->
gen_fsm:reply(StateData#state.http_receiver, HTTPReply),
cancel_timer(StateData#state.wait_timer),
Rid = StateData#state.rid,
- ReqList = [#hbr{rid = Rid,
- key = StateData#state.key,
- out = Output
- } |
- [El || El <- StateData#state.req_list,
- El#hbr.rid /= Rid ]
- ],
+ ReqList = [#hbr{rid = Rid, key = StateData#state.key,
+ out = Output}
+ | [El
+ || El <- StateData#state.req_list, El#hbr.rid /= Rid]],
Reply = ok,
{reply, Reply, StateName,
- StateData#state{output = [],
- http_receiver = undefined,
- req_list = ReqList,
- wait_timer = undefined,
+ StateData#state{output = [], http_receiver = undefined,
+ req_list = ReqList, wait_timer = undefined,
timer = Timer}};
handle_sync_event({stop,close}, _From, _StateName, StateData) ->
@@ -412,32 +419,30 @@ handle_sync_event({stop,Reason}, _From, _StateName, StateData) ->
?DEBUG("Closing bind session ~p - Reason: ~p", [StateData#state.id, Reason]),
Reply = ok,
{stop, normal, Reply, StateData};
-
%% HTTP PUT: Receive packets from the client
-handle_sync_event(#http_put{rid = Rid},
- _From, StateName, StateData)
- when StateData#state.shaper_timer /= undefined ->
- Pause =
- case erlang:read_timer(StateData#state.shaper_timer) of
- false ->
- 0;
- P -> P
- end,
+handle_sync_event(#http_put{rid = Rid}, _From,
+ StateName, StateData)
+ when StateData#state.shaper_timer /= undefined ->
+ Pause = case
+ erlang:read_timer(StateData#state.shaper_timer)
+ of
+ false -> 0;
+ P -> P
+ end,
Reply = {wait, Pause},
?DEBUG("Shaper timer for RID ~p: ~p", [Rid, Reply]),
{reply, Reply, StateName, StateData};
-
-handle_sync_event(#http_put{payload_size = PayloadSize} = Request,
+handle_sync_event(#http_put{payload_size =
+ PayloadSize} =
+ Request,
_From, StateName, StateData) ->
- ?DEBUG("New request: ~p",[Request]),
- %% Updating trafic shaper
+ ?DEBUG("New request: ~p", [Request]),
{NewShaperState, NewShaperTimer} =
- update_shaper(StateData#state.shaper_state, PayloadSize),
-
+ update_shaper(StateData#state.shaper_state,
+ PayloadSize),
handle_http_put_event(Request, StateName,
StateData#state{shaper_state = NewShaperState,
shaper_timer = NewShaperTimer});
-
%% HTTP GET: send packets to the client
handle_sync_event({http_get, Rid, Wait, Hold}, From, StateName, StateData) ->
%% setup timer
@@ -490,14 +495,13 @@ handle_sync_event({http_get, Rid, Wait, Hold}, From, StateName, StateData) ->
req_list = ReqList}}
end
end;
-
-handle_sync_event(peername, _From, StateName, StateData) ->
+handle_sync_event(peername, _From, StateName,
+ StateData) ->
Reply = {ok, StateData#state.ip},
{reply, Reply, StateName, StateData};
-
-handle_sync_event(_Event, _From, StateName, StateData) ->
- Reply = ok,
- {reply, Reply, StateName, StateData}.
+handle_sync_event(_Event, _From, StateName,
+ StateData) ->
+ Reply = ok, {reply, Reply, StateName, StateData}.
code_change(_OldVsn, StateName, StateData, _Extra) ->
{ok, StateName, StateData}.
@@ -510,35 +514,30 @@ code_change(_OldVsn, StateName, StateData, _Extra) ->
%%----------------------------------------------------------------------
%% We reached the max_inactivity timeout:
handle_info({timeout, Timer, _}, _StateName,
- #state{id=SID, timer = Timer} = StateData) ->
- ?INFO_MSG("Session timeout. Closing the HTTP bind session: ~p", [SID]),
+ #state{id = SID, timer = Timer} = StateData) ->
+ ?INFO_MSG("Session timeout. Closing the HTTP bind "
+ "session: ~p",
+ [SID]),
{stop, normal, StateData};
-
handle_info({timeout, WaitTimer, _}, StateName,
#state{wait_timer = WaitTimer} = StateData) ->
- if
- StateData#state.http_receiver /= undefined ->
- cancel_timer(StateData#state.timer),
- Timer = set_inactivity_timer(StateData#state.pause,
- StateData#state.max_inactivity),
- gen_fsm:reply(StateData#state.http_receiver, {ok, empty}),
- Rid = StateData#state.rid,
- ReqList = [#hbr{rid = Rid,
- key = StateData#state.key,
- out = []
- } |
- [El || El <- StateData#state.req_list,
- El#hbr.rid /= Rid ]
- ],
- {next_state, StateName,
- StateData#state{http_receiver = undefined,
- req_list = ReqList,
- wait_timer = undefined,
- timer = Timer}};
- true ->
- {next_state, StateName, StateData}
+ if StateData#state.http_receiver /= undefined ->
+ cancel_timer(StateData#state.timer),
+ Timer = set_inactivity_timer(StateData#state.pause,
+ StateData#state.max_inactivity),
+ gen_fsm:reply(StateData#state.http_receiver,
+ {ok, empty}),
+ Rid = StateData#state.rid,
+ ReqList = [#hbr{rid = Rid, key = StateData#state.key,
+ out = []}
+ | [El
+ || El <- StateData#state.req_list, El#hbr.rid /= Rid]],
+ {next_state, StateName,
+ StateData#state{http_receiver = undefined,
+ req_list = ReqList, wait_timer = undefined,
+ timer = Timer}};
+ true -> {next_state, StateName, StateData}
end;
-
handle_info({timeout, ShaperTimer, _}, StateName,
#state{shaper_timer = ShaperTimer} = StateData) ->
{next_state, StateName, StateData#state{shaper_timer = undefined}};
@@ -552,14 +551,14 @@ handle_info(_, StateName, StateData) ->
%% Returns: any
%%----------------------------------------------------------------------
terminate(_Reason, _StateName, StateData) ->
- ?DEBUG("terminate: Deleting session ~s", [StateData#state.id]),
+ ?DEBUG("terminate: Deleting session ~s",
+ [StateData#state.id]),
mnesia:dirty_delete({http_bind, StateData#state.id}),
- send_receiver_reply(StateData#state.http_receiver, {ok, terminate}),
+ send_receiver_reply(StateData#state.http_receiver,
+ {ok, terminate}),
case StateData#state.waiting_input of
- false ->
- ok;
- C2SPid ->
- gen_fsm:send_event(C2SPid, closed)
+ false -> ok;
+ C2SPid -> gen_fsm:send_event(C2SPid, closed)
end,
ok.
@@ -568,250 +567,227 @@ terminate(_Reason, _StateName, StateData) ->
%%%----------------------------------------------------------------------
%% PUT / Get processing:
-handle_http_put_event(#http_put{rid = Rid, attrs = Attrs,
- hold = Hold} = Request,
+handle_http_put_event(#http_put{rid = Rid,
+ attrs = Attrs, hold = Hold} =
+ Request,
StateName, StateData) ->
- ?DEBUG("New request: ~p",[Request]),
- %% Check if Rid valid
- RidAllow = rid_allow(StateData#state.rid, Rid, Attrs, Hold,
- StateData#state.max_pause),
-
- %% Check if Rid is in sequence or out of sequence:
+ ?DEBUG("New request: ~p", [Request]),
+ RidAllow = rid_allow(StateData#state.rid, Rid, Attrs,
+ Hold, StateData#state.max_pause),
case RidAllow of
- buffer ->
- ?DEBUG("Buffered request: ~p", [Request]),
- %% Request is out of sequence:
- PendingRequests = StateData#state.unprocessed_req_list,
- %% In case an existing RID was already buffered:
- Requests = lists:keydelete(Rid, 2, PendingRequests),
- ReqList = [#hbr{rid = Rid,
- key = StateData#state.key,
- out = []
- } |
- [El || El <- StateData#state.req_list,
- El#hbr.rid > (Rid - 1 - Hold)]
- ],
- ?DEBUG("reqlist: ~p", [ReqList]),
- UnprocessedReqList = [Request | Requests],
- cancel_timer(StateData#state.timer),
- Timer = set_inactivity_timer(0, StateData#state.max_inactivity),
- {reply, ok, StateName,
- StateData#state{unprocessed_req_list = UnprocessedReqList,
- req_list = ReqList,
- timer = Timer}, hibernate};
-
- _ ->
- %% Request is in sequence:
- process_http_put(Request, StateName, StateData, RidAllow)
+ buffer ->
+ ?DEBUG("Buffered request: ~p", [Request]),
+ PendingRequests = StateData#state.unprocessed_req_list,
+ Requests = lists:keydelete(Rid, 2, PendingRequests),
+ ReqList = [#hbr{rid = Rid, key = StateData#state.key,
+ out = []}
+ | [El
+ || El <- StateData#state.req_list,
+ El#hbr.rid > Rid - 1 - Hold]],
+ ?DEBUG("reqlist: ~p", [ReqList]),
+ UnprocessedReqList = [Request | Requests],
+ cancel_timer(StateData#state.timer),
+ Timer = set_inactivity_timer(0,
+ StateData#state.max_inactivity),
+ {reply, ok, StateName,
+ StateData#state{unprocessed_req_list =
+ UnprocessedReqList,
+ req_list = ReqList, timer = Timer},
+ hibernate};
+ _ ->
+ process_http_put(Request, StateName, StateData,
+ RidAllow)
end.
-process_http_put(#http_put{rid = Rid, attrs = Attrs, payload = Payload,
- hold = Hold, stream = StreamTo,
- ip = IP} = Request,
+process_http_put(#http_put{rid = Rid, attrs = Attrs,
+ payload = Payload, hold = Hold, stream = StreamTo,
+ ip = IP} =
+ Request,
StateName, StateData, RidAllow) ->
?DEBUG("Actually processing request: ~p", [Request]),
- %% Check if key valid
- Key = xml:get_attr_s("key", Attrs),
- NewKey = xml:get_attr_s("newkey", Attrs),
- KeyAllow =
- case RidAllow of
- repeat ->
- true;
- false ->
- false;
- {true, _} ->
- case StateData#state.key of
- "" ->
- true;
- OldKey ->
- NextKey = sha:sha(Key),
- ?DEBUG("Key/OldKey/NextKey: ~s/~s/~s", [Key, OldKey, NextKey]),
- if
- OldKey == NextKey ->
- true;
- true ->
- ?DEBUG("wrong key: ~s",[Key]),
- false
- end
- end
- end,
+ Key = xml:get_attr_s(<<"key">>, Attrs),
+ NewKey = xml:get_attr_s(<<"newkey">>, Attrs),
+ KeyAllow = case RidAllow of
+ repeat -> true;
+ false -> false;
+ {true, _} ->
+ case StateData#state.key of
+ <<"">> -> true;
+ OldKey ->
+ NextKey = sha:sha(Key),
+ ?DEBUG("Key/OldKey/NextKey: ~s/~s/~s",
+ [Key, OldKey, NextKey]),
+ if OldKey == NextKey -> true;
+ true -> ?DEBUG("wrong key: ~s", [Key]), false
+ end
+ end
+ end,
TNow = tnow(),
- LastPoll = if
- Payload == [] ->
- TNow;
- true ->
- 0
+ LastPoll = if Payload == [] -> TNow;
+ true -> 0
end,
- if
- (Payload == []) and
- (Hold == 0) and
- (TNow - StateData#state.last_poll < ?MIN_POLLING) ->
- Reply = {error, polling_too_frequently},
- {reply, Reply, StateName, StateData};
- KeyAllow ->
- case RidAllow of
- false ->
- Reply = {error, not_exists},
- {reply, Reply, StateName, StateData};
- repeat ->
- ?DEBUG("REPEATING ~p", [Rid]),
- case [El#hbr.out ||
- El <- StateData#state.req_list,
- El#hbr.rid == Rid] of
- [] ->
- {error, not_exists};
- [Out | _XS] ->
- if (Rid == StateData#state.rid) and
- (StateData#state.http_receiver /= undefined) ->
- {reply, ok, StateName, StateData};
- true ->
- Reply = {repeat, lists:reverse(Out)},
- {reply, Reply, StateName, StateData#state{last_poll = LastPoll}}
- end
- end;
- {true, Pause} ->
- SaveKey = if
- NewKey == "" ->
- Key;
- true ->
- NewKey
- end,
- ?DEBUG(" -- SaveKey: ~s~n", [SaveKey]),
-
- %% save request
- ReqList1 =
- [El || El <- StateData#state.req_list,
- El#hbr.rid > (Rid - 1 - Hold)],
- ReqList =
- case lists:keymember(Rid, #hbr.rid, ReqList1) of
- true ->
- ReqList1;
- false ->
- [#hbr{rid = Rid,
- key = StateData#state.key,
- out = []
- } |
- ReqList1
- ]
- end,
- ?DEBUG("reqlist: ~p", [ReqList]),
-
- %% setup next timer
- cancel_timer(StateData#state.timer),
- Timer = set_inactivity_timer(Pause,
- StateData#state.max_inactivity),
- case StateData#state.waiting_input of
- false ->
- Input =
- lists:foldl(
- fun queue:in/2,
- StateData#state.input, Payload),
- Reply = ok,
- process_buffered_request(Reply, StateName,
- StateData#state{input = Input,
- rid = Rid,
- key = SaveKey,
- ctime = TNow,
- timer = Timer,
- pause = Pause,
- last_poll = LastPoll,
- req_list = ReqList,
- ip = IP
- });
- C2SPid ->
- case StreamTo of
- {To, ""} ->
- gen_fsm:send_event(
- C2SPid,
- {xmlstreamstart, "stream:stream",
- [{"to", To},
- {"xmlns", ?NS_CLIENT},
- {"xmlns:stream", ?NS_STREAM}]});
- {To, Version} ->
- gen_fsm:send_event(
- C2SPid,
- {xmlstreamstart, "stream:stream",
- [{"to", To},
- {"xmlns", ?NS_CLIENT},
- {"version", Version},
- {"xmlns:stream", ?NS_STREAM}]});
- _ ->
- ok
- end,
-
- MaxInactivity = get_max_inactivity(StreamTo, StateData#state.max_inactivity),
- MaxPause = get_max_inactivity(StreamTo, StateData#state.max_pause),
-
- ?DEBUG("really sending now: ~p", [Payload]),
- lists:foreach(
- fun({xmlstreamend, End}) ->
- gen_fsm:send_event(
- C2SPid, {xmlstreamend, End});
- (El) ->
- gen_fsm:send_event(
- C2SPid, {xmlstreamelement, El})
- end, Payload),
- Reply = ok,
- process_buffered_request(Reply, StateName,
- StateData#state{input = queue:new(),
- rid = Rid,
- key = SaveKey,
- ctime = TNow,
- timer = Timer,
- pause = Pause,
- last_poll = LastPoll,
- req_list = ReqList,
- max_inactivity = MaxInactivity,
- max_pause = MaxPause,
- ip = IP
- })
- end
- end;
- true ->
- Reply = {error, bad_key},
- {reply, Reply, StateName, StateData}
+ if (Payload == []) and (Hold == 0) and
+ (TNow - StateData#state.last_poll < (?MIN_POLLING)) ->
+ Reply = {error, polling_too_frequently},
+ {reply, Reply, StateName, StateData};
+ KeyAllow ->
+ case RidAllow of
+ false ->
+ Reply = {error, not_exists},
+ {reply, Reply, StateName, StateData};
+ repeat ->
+ ?DEBUG("REPEATING ~p", [Rid]),
+ case [El#hbr.out
+ || El <- StateData#state.req_list, El#hbr.rid == Rid]
+ of
+ [] -> {error, not_exists};
+ [Out | _XS] ->
+ if (Rid == StateData#state.rid) and
+ (StateData#state.http_receiver /= undefined) ->
+ {reply, ok, StateName, StateData};
+ true ->
+ Reply = {repeat, lists:reverse(Out)},
+ {reply, Reply, StateName,
+ StateData#state{last_poll = LastPoll}}
+ end
+ end;
+ {true, Pause} ->
+ SaveKey = if NewKey == <<"">> -> Key;
+ true -> NewKey
+ end,
+ ?DEBUG(" -- SaveKey: ~s~n", [SaveKey]),
+ ReqList1 = [El
+ || El <- StateData#state.req_list,
+ El#hbr.rid > Rid - 1 - Hold],
+ ReqList = case lists:keymember(Rid, #hbr.rid, ReqList1)
+ of
+ true -> ReqList1;
+ false ->
+ [#hbr{rid = Rid, key = StateData#state.key,
+ out = []}
+ | ReqList1]
+ end,
+ ?DEBUG("reqlist: ~p", [ReqList]),
+ cancel_timer(StateData#state.timer),
+ Timer = set_inactivity_timer(Pause,
+ StateData#state.max_inactivity),
+ case StateData#state.waiting_input of
+ false ->
+ Input = lists:foldl(fun queue:in/2,
+ StateData#state.input, Payload),
+ Reply = ok,
+ process_buffered_request(Reply, StateName,
+ StateData#state{input = Input,
+ rid = Rid,
+ key = SaveKey,
+ ctime = TNow,
+ timer = Timer,
+ pause = Pause,
+ last_poll =
+ LastPoll,
+ req_list =
+ ReqList,
+ ip = IP});
+ C2SPid ->
+ case StreamTo of
+ {To, <<"">>} ->
+ gen_fsm:send_event(C2SPid,
+ {xmlstreamstart,
+ <<"stream:stream">>,
+ [{<<"to">>, To},
+ {<<"xmlns">>, ?NS_CLIENT},
+ {<<"xmlns:stream">>,
+ ?NS_STREAM}]});
+ {To, Version} ->
+ gen_fsm:send_event(C2SPid,
+ {xmlstreamstart,
+ <<"stream:stream">>,
+ [{<<"to">>, To},
+ {<<"xmlns">>, ?NS_CLIENT},
+ {<<"version">>, Version},
+ {<<"xmlns:stream">>,
+ ?NS_STREAM}]});
+ _ -> ok
+ end,
+ MaxInactivity = get_max_inactivity(StreamTo,
+ StateData#state.max_inactivity),
+ MaxPause = get_max_inactivity(StreamTo,
+ StateData#state.max_pause),
+ ?DEBUG("really sending now: ~p", [Payload]),
+ lists:foreach(fun ({xmlstreamend, End}) ->
+ gen_fsm:send_event(C2SPid,
+ {xmlstreamend,
+ End});
+ (El) ->
+ gen_fsm:send_event(C2SPid,
+ {xmlstreamelement,
+ El})
+ end,
+ Payload),
+ Reply = ok,
+ process_buffered_request(Reply, StateName,
+ StateData#state{input =
+ queue:new(),
+ rid = Rid,
+ key = SaveKey,
+ ctime = TNow,
+ timer = Timer,
+ pause = Pause,
+ last_poll =
+ LastPoll,
+ req_list =
+ ReqList,
+ max_inactivity =
+ MaxInactivity,
+ max_pause =
+ MaxPause,
+ ip = IP})
+ end
+ end;
+ true ->
+ Reply = {error, bad_key},
+ {reply, Reply, StateName, StateData}
end.
process_buffered_request(Reply, StateName, StateData) ->
Rid = StateData#state.rid,
Requests = StateData#state.unprocessed_req_list,
- case lists:keysearch(Rid+1, 2, Requests) of
- {value, Request} ->
- ?DEBUG("Processing buffered request: ~p", [Request]),
- NewRequests = lists:keydelete(Rid+1, 2, Requests),
- handle_http_put_event(
- Request, StateName,
- StateData#state{unprocessed_req_list = NewRequests});
- _ ->
- {reply, Reply, StateName, StateData, hibernate}
+ case lists:keysearch(Rid + 1, 2, Requests) of
+ {value, Request} ->
+ ?DEBUG("Processing buffered request: ~p", [Request]),
+ NewRequests = lists:keydelete(Rid + 1, 2, Requests),
+ handle_http_put_event(Request, StateName,
+ StateData#state{unprocessed_req_list =
+ NewRequests});
+ _ -> {reply, Reply, StateName, StateData, hibernate}
end.
-handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP) ->
- case http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP) of
- {error, not_exists} ->
- ?DEBUG("no session associated with sid: ~p", [Sid]),
- {404, ?HEADER, ""};
- {{error, Reason}, Sess} ->
- ?DEBUG("Error on HTTP put. Reason: ~p", [Reason]),
- handle_http_put_error(Reason, Sess);
- {{repeat, OutPacket}, Sess} ->
- ?DEBUG("http_put said 'repeat!' ...~nOutPacket: ~p", [OutPacket]),
- send_outpacket(Sess, OutPacket);
- {{wait, Pause}, _Sess} ->
- ?DEBUG("Trafic Shaper: Delaying request ~p", [Rid]),
- timer:sleep(Pause),
- %{200, ?HEADER,
- % xml:element_to_binary(
- % {xmlelement, "body",
- % [{"xmlns", ?NS_HTTP_BIND},
- % {"type", "error"}], []})};
- handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize,
- StreamStart, IP);
- {ok, Sess} ->
- prepare_response(Sess, Rid, [], StreamStart)
+handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize,
+ StreamStart, IP) ->
+ case http_put(Sid, Rid, Attrs, Payload, PayloadSize,
+ StreamStart, IP)
+ of
+ {error, not_exists} ->
+ ?DEBUG("no session associated with sid: ~p", [Sid]),
+ {404, ?HEADER, <<"">>};
+ {{error, Reason}, Sess} ->
+ ?DEBUG("Error on HTTP put. Reason: ~p", [Reason]),
+ handle_http_put_error(Reason, Sess);
+ {{repeat, OutPacket}, Sess} ->
+ ?DEBUG("http_put said 'repeat!' ...~nOutPacket: ~p",
+ [OutPacket]),
+ send_outpacket(Sess, OutPacket);
+ {{wait, Pause}, _Sess} ->
+ ?DEBUG("Trafic Shaper: Delaying request ~p", [Rid]),
+ timer:sleep(Pause),
+ handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize,
+ StreamStart, IP);
+ {ok, Sess} ->
+ prepare_response(Sess, Rid, [], StreamStart)
end.
-http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP) ->
+http_put(Sid, Rid, Attrs, Payload, PayloadSize,
+ StreamStart, IP) ->
?DEBUG("Looking for session: ~p", [Sid]),
case mnesia:dirty_read({http_bind, Sid}) of
[] ->
@@ -822,7 +798,7 @@ http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP) ->
true ->
{To, StreamVersion};
_ ->
- ""
+ <<"">>
end,
{gen_fsm:sync_send_all_state_event(
FsmRef, #http_put{rid = Rid, attrs = Attrs, payload = Payload,
@@ -830,425 +806,430 @@ http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP) ->
stream = NewStream, ip = IP}, 30000), Sess}
end.
-handle_http_put_error(Reason, #http_bind{pid=FsmRef, version=Version})
- when Version >= 0 ->
- gen_fsm:sync_send_all_state_event(FsmRef, {stop, {put_error,Reason}}),
+handle_http_put_error(Reason,
+ #http_bind{pid = FsmRef, version = Version})
+ when Version >= 0 ->
+ gen_fsm:sync_send_all_state_event(FsmRef,
+ {stop, {put_error, Reason}}),
case Reason of
- not_exists ->
- {200, ?HEADER,
- xml:element_to_binary(
- {xmlelement, "body",
- [{"xmlns", ?NS_HTTP_BIND},
- {"type", "terminate"},
- {"condition", "item-not-found"}], []})};
- bad_key ->
- {200, ?HEADER,
- xml:element_to_binary(
- {xmlelement, "body",
- [{"xmlns", ?NS_HTTP_BIND},
- {"type", "terminate"},
- {"condition", "item-not-found"}], []})};
- polling_too_frequently ->
- {200, ?HEADER,
- xml:element_to_binary(
- {xmlelement, "body",
- [{"xmlns", ?NS_HTTP_BIND},
- {"type", "terminate"},
- {"condition", "policy-violation"}], []})}
+ not_exists ->
+ {200, ?HEADER,
+ xml:element_to_binary(#xmlel{name = <<"body">>,
+ attrs =
+ [{<<"xmlns">>, ?NS_HTTP_BIND},
+ {<<"type">>, <<"terminate">>},
+ {<<"condition">>,
+ <<"item-not-found">>}],
+ children = []})};
+ bad_key ->
+ {200, ?HEADER,
+ xml:element_to_binary(#xmlel{name = <<"body">>,
+ attrs =
+ [{<<"xmlns">>, ?NS_HTTP_BIND},
+ {<<"type">>, <<"terminate">>},
+ {<<"condition">>,
+ <<"item-not-found">>}],
+ children = []})};
+ polling_too_frequently ->
+ {200, ?HEADER,
+ xml:element_to_binary(#xmlel{name = <<"body">>,
+ attrs =
+ [{<<"xmlns">>, ?NS_HTTP_BIND},
+ {<<"type">>, <<"terminate">>},
+ {<<"condition">>,
+ <<"policy-violation">>}],
+ children = []})}
end;
-handle_http_put_error(Reason, #http_bind{pid=FsmRef}) ->
- gen_fsm:sync_send_all_state_event(FsmRef,{stop, {put_error_no_version, Reason}}),
+handle_http_put_error(Reason,
+ #http_bind{pid = FsmRef}) ->
+ gen_fsm:sync_send_all_state_event(FsmRef,
+ {stop, {put_error_no_version, Reason}}),
case Reason of
- not_exists -> %% bad rid
- ?DEBUG("Closing HTTP bind session (Bad rid).", []),
- {404, ?HEADER, ""};
- bad_key ->
- ?DEBUG("Closing HTTP bind session (Bad key).", []),
- {404, ?HEADER, ""};
- polling_too_frequently ->
- ?DEBUG("Closing HTTP bind session (User polling too frequently).", []),
- {403, ?HEADER, ""}
+ not_exists -> %% bad rid
+ ?DEBUG("Closing HTTP bind session (Bad rid).", []),
+ {404, ?HEADER, <<"">>};
+ bad_key ->
+ ?DEBUG("Closing HTTP bind session (Bad key).", []),
+ {404, ?HEADER, <<"">>};
+ polling_too_frequently ->
+ ?DEBUG("Closing HTTP bind session (User polling "
+ "too frequently).",
+ []),
+ {403, ?HEADER, <<"">>}
end.
%% Control RID ordering
rid_allow(none, _NewRid, _Attrs, _Hold, _MaxPause) ->
- %% First request - nothing saved so far
{true, 0};
rid_allow(OldRid, NewRid, Attrs, Hold, MaxPause) ->
- ?DEBUG("Previous rid / New rid: ~p/~p", [OldRid, NewRid]),
+ ?DEBUG("Previous rid / New rid: ~p/~p",
+ [OldRid, NewRid]),
if
- %% We did not miss any packet, we can process it immediately:
- NewRid == OldRid + 1 ->
- case catch list_to_integer(
- xml:get_attr_s("pause", Attrs)) of
- {'EXIT', _} ->
- {true, 0};
- Pause1 when Pause1 =< MaxPause ->
- ?DEBUG("got pause: ~p", [Pause1]),
- {true, Pause1};
- _ ->
- {true, 0}
- end;
- %% We have missed packets, we need to cached it to process it later on:
- (OldRid < NewRid) and
- (NewRid =< (OldRid + Hold + 1)) ->
- buffer;
- (NewRid =< OldRid) and
- (NewRid > OldRid - Hold - 1) ->
- repeat;
- true ->
- false
+ %% We did not miss any packet, we can process it immediately:
+ NewRid == OldRid + 1 ->
+ case catch
+ jlib:binary_to_integer(xml:get_attr_s(<<"pause">>,
+ Attrs))
+ of
+ {'EXIT', _} -> {true, 0};
+ Pause1 when Pause1 =< MaxPause ->
+ ?DEBUG("got pause: ~p", [Pause1]), {true, Pause1};
+ _ -> {true, 0}
+ end;
+ %% We have missed packets, we need to cached it to process it later on:
+ (OldRid < NewRid) and (NewRid =< OldRid + Hold + 1) ->
+ buffer;
+ (NewRid =< OldRid) and (NewRid > OldRid - Hold - 1) ->
+ repeat;
+ true -> false
end.
update_shaper(ShaperState, PayloadSize) ->
- {NewShaperState, Pause} = shaper:update(ShaperState, PayloadSize),
- if
- Pause > 0 ->
- ShaperTimer = erlang:start_timer(Pause, self(), activate), %% MR: Seems timer is not needed. Activate is not handled
- {NewShaperState, ShaperTimer};
- true ->
- {NewShaperState, undefined}
+ {NewShaperState, Pause} = shaper:update(ShaperState,
+ PayloadSize),
+ if Pause > 0 ->
+ ShaperTimer = erlang:start_timer(Pause, self(),
+ activate),
+ {NewShaperState, ShaperTimer};
+ true -> {NewShaperState, undefined}
end.
prepare_response(Sess, Rid, OutputEls, StreamStart) ->
- receive after Sess#http_bind.process_delay -> ok end,
+ receive after Sess#http_bind.process_delay -> ok end,
case catch http_get(Sess, Rid) of
- {ok, cancel} ->
- %% actually it would be better if we could completely
- %% cancel this request, but then we would have to hack
- %% ejabberd_http and I'm too lazy now
- {200, ?HEADER, "<body type='error' xmlns='"++?NS_HTTP_BIND++"'/>"};
- {ok, empty} ->
- {200, ?HEADER, "<body xmlns='"++?NS_HTTP_BIND++"'/>"};
- {ok, terminate} ->
- {200, ?HEADER, "<body type='terminate' xmlns='"++?NS_HTTP_BIND++"'/>"};
- {ok, ROutPacket} ->
- OutPacket = lists:reverse(ROutPacket),
- ?DEBUG("OutPacket: ~p", [OutputEls++OutPacket]),
- prepare_outpacket_response(Sess, Rid, OutputEls++OutPacket, StreamStart);
- {'EXIT', {shutdown, _}} ->
- {200, ?HEADER, "<body type='terminate' condition='system-shutdown' xmlns='"++?NS_HTTP_BIND++"'/>"};
- {'EXIT', _Reason} ->
- {200, ?HEADER, "<body type='terminate' xmlns='"++?NS_HTTP_BIND++"'/>"}
+ {ok, cancel} ->
+ {200, ?HEADER,
+ <<"<body type='error' xmlns='", (?NS_HTTP_BIND)/binary,
+ "'/>">>};
+ {ok, empty} ->
+ {200, ?HEADER,
+ <<"<body xmlns='", (?NS_HTTP_BIND)/binary, "'/>">>};
+ {ok, terminate} ->
+ {200, ?HEADER,
+ <<"<body type='terminate' xmlns='",
+ (?NS_HTTP_BIND)/binary, "'/>">>};
+ {ok, ROutPacket} ->
+ OutPacket = lists:reverse(ROutPacket),
+ ?DEBUG("OutPacket: ~p", [OutputEls ++ OutPacket]),
+ prepare_outpacket_response(Sess, Rid,
+ OutputEls ++ OutPacket, StreamStart);
+ {'EXIT', {shutdown, _}} ->
+ {200, ?HEADER,
+ <<"<body type='terminate' condition='system-shut"
+ "down' xmlns='",
+ (?NS_HTTP_BIND)/binary, "'/>">>};
+ {'EXIT', _Reason} ->
+ {200, ?HEADER,
+ <<"<body type='terminate' xmlns='",
+ (?NS_HTTP_BIND)/binary, "'/>">>}
end.
%% Send output payloads on establised sessions
-prepare_outpacket_response(Sess, _Rid, OutPacket, false) ->
+prepare_outpacket_response(Sess, _Rid, OutPacket,
+ false) ->
case catch send_outpacket(Sess, OutPacket) of
- {'EXIT', _Reason} ->
- ?DEBUG("Error in sending packet ~p ", [_Reason]),
- {200, ?HEADER,
- "<body type='terminate' xmlns='"++
- ?NS_HTTP_BIND++"'/>"};
- SendRes ->
- SendRes
+ {'EXIT', _Reason} ->
+ ?DEBUG("Error in sending packet ~p ", [_Reason]),
+ {200, ?HEADER,
+ <<"<body type='terminate' xmlns='",
+ (?NS_HTTP_BIND)/binary, "'/>">>};
+ SendRes -> SendRes
end;
%% Handle a new session along with its output payload
-prepare_outpacket_response(#http_bind{id=Sid, wait=Wait,
- hold=Hold, to=To}=_Sess,
- _Rid, OutPacket, true) ->
+prepare_outpacket_response(#http_bind{id = Sid,
+ wait = Wait, hold = Hold, to = To} =
+ _Sess,
+ _Rid, OutPacket, true) ->
case OutPacket of
- [{xmlstreamstart, _, OutAttrs} | Els] ->
- AuthID = xml:get_attr_s("id", OutAttrs),
- From = xml:get_attr_s("from", OutAttrs),
- Version = xml:get_attr_s("version", OutAttrs),
- OutEls =
- case Els of
- [] ->
- [];
- [{xmlstreamelement,
- {xmlelement, "stream:features",
- StreamAttribs, StreamEls}}
- | StreamTail] ->
- TypedTail =
- [check_default_xmlns(OEl) ||
- {xmlstreamelement, OEl} <-
- StreamTail],
- [{xmlelement, "stream:features",
- [{"xmlns:stream",
- ?NS_STREAM}] ++
- StreamAttribs, StreamEls}] ++
- TypedTail;
- StreamTail ->
- [check_default_xmlns(OEl) ||
- {xmlstreamelement, OEl} <-
- StreamTail]
- end,
- case OutEls of
- [{xmlelement,
- "stream:error",_,_}] ->
- {200, ?HEADER, "<body type='terminate' "
- "condition='host-unknown' "
- "xmlns='"++?NS_HTTP_BIND++"'/>"};
- _ ->
- BOSH_attribs =
- [{"authid", AuthID},
- {"xmlns:xmpp", ?NS_BOSH},
- {"xmlns:stream", ?NS_STREAM}] ++
- case OutEls of
- [] ->
- [];
- _ ->
- [{"xmpp:version", Version}]
- end,
- MaxInactivity = get_max_inactivity(To, ?MAX_INACTIVITY),
- MaxPause = get_max_pause(To),
- {200, ?HEADER,
- xml:element_to_binary(
- {xmlelement,"body",
- [{"xmlns",
- ?NS_HTTP_BIND},
- {"sid", Sid},
- {"wait", integer_to_list(Wait)},
- {"requests", integer_to_list(Hold+1)},
- {"inactivity",
- integer_to_list(
- trunc(MaxInactivity/1000))},
- {"maxpause",
- integer_to_list(MaxPause)},
- {"polling",
- integer_to_list(
- trunc(?MIN_POLLING/1000000))},
- {"ver", ?BOSH_VERSION},
- {"from", From},
- {"secure", "true"} %% we're always being secure
- ] ++ BOSH_attribs,OutEls})}
- end;
- _ ->
- {200, ?HEADER, "<body type='terminate' "
- "condition='internal-server-error' "
- "xmlns='"++?NS_HTTP_BIND++"'/>"}
+ [{xmlstreamstart, _, OutAttrs} | Els] ->
+ AuthID = xml:get_attr_s(<<"id">>, OutAttrs),
+ From = xml:get_attr_s(<<"from">>, OutAttrs),
+ Version = xml:get_attr_s(<<"version">>, OutAttrs),
+ OutEls = case Els of
+ [] -> [];
+ [{xmlstreamelement,
+ #xmlel{name = <<"stream:features">>,
+ attrs = StreamAttribs, children = StreamEls}}
+ | StreamTail] ->
+ TypedTail = [check_default_xmlns(OEl)
+ || {xmlstreamelement, OEl} <- StreamTail],
+ [#xmlel{name = <<"stream:features">>,
+ attrs =
+ [{<<"xmlns:stream">>, ?NS_STREAM}] ++
+ StreamAttribs,
+ children = StreamEls}]
+ ++ TypedTail;
+ StreamTail ->
+ [check_default_xmlns(OEl)
+ || {xmlstreamelement, OEl} <- StreamTail]
+ end,
+ case OutEls of
+ [#xmlel{name = <<"stream:error">>}] ->
+ {200, ?HEADER,
+ <<"<body type='terminate' condition='host-unknow"
+ "n' xmlns='",
+ (?NS_HTTP_BIND)/binary, "'/>">>};
+ _ ->
+ BOSH_attribs = [{<<"authid">>, AuthID},
+ {<<"xmlns:xmpp">>, ?NS_BOSH},
+ {<<"xmlns:stream">>, ?NS_STREAM}]
+ ++
+ case OutEls of
+ [] -> [];
+ _ -> [{<<"xmpp:version">>, Version}]
+ end,
+ MaxInactivity = get_max_inactivity(To, ?MAX_INACTIVITY),
+ MaxPause = get_max_pause(To),
+ {200, ?HEADER,
+ xml:element_to_binary(#xmlel{name = <<"body">>,
+ attrs =
+ [{<<"xmlns">>, ?NS_HTTP_BIND},
+ {<<"sid">>, Sid},
+ {<<"wait">>,
+ iolist_to_binary(integer_to_list(Wait))},
+ {<<"requests">>,
+ iolist_to_binary(integer_to_list(Hold
+ +
+ 1))},
+ {<<"inactivity">>,
+ iolist_to_binary(integer_to_list(trunc(MaxInactivity
+ /
+ 1000)))},
+ {<<"maxpause">>,
+ iolist_to_binary(integer_to_list(MaxPause))},
+ {<<"polling">>,
+ iolist_to_binary(integer_to_list(trunc((?MIN_POLLING)
+ /
+ 1000000)))},
+ {<<"ver">>, ?BOSH_VERSION},
+ {<<"from">>, From},
+ {<<"secure">>, <<"true">>}]
+ ++ BOSH_attribs,
+ children = OutEls})}
+ end;
+ _ ->
+ {200, ?HEADER,
+ <<"<body type='terminate' condition='internal-se"
+ "rver-error' xmlns='",
+ (?NS_HTTP_BIND)/binary, "'/>">>}
end.
-
-http_get(#http_bind{pid = FsmRef, wait = Wait, hold = Hold}, Rid) ->
- gen_fsm:sync_send_all_state_event(
- FsmRef, {http_get, Rid, Wait, Hold}, 2 * ?MAX_WAIT * 1000).
+http_get(#http_bind{pid = FsmRef, wait = Wait,
+ hold = Hold},
+ Rid) ->
+ gen_fsm:sync_send_all_state_event(FsmRef,
+ {http_get, Rid, Wait, Hold},
+ 2 * (?MAX_WAIT) * 1000).
send_outpacket(#http_bind{pid = FsmRef}, OutPacket) ->
case OutPacket of
- [] ->
- {200, ?HEADER, "<body xmlns='"++?NS_HTTP_BIND++"'/>"};
- [{xmlstreamend, _}] ->
- gen_fsm:sync_send_all_state_event(FsmRef,{stop,stream_closed}),
- {200, ?HEADER, "<body xmlns='"++?NS_HTTP_BIND++"'/>"};
- _ ->
- %% TODO: We parse to add a default namespace to packet,
- %% The spec says adding the jabber:client namespace if
- %% mandatory, even if some implementation do not do that
- %% change on packets.
- %% I think this should be an option to avoid modifying
- %% packet in most case.
- AllElements =
- lists:all(fun({xmlstreamelement,
- {xmlelement, "stream:error", _, _}}) -> false;
- ({xmlstreamelement, _}) -> true;
- ({xmlstreamraw, _}) -> true;
- (_) -> false
- end, OutPacket),
- case AllElements of
- true ->
- TypedEls = lists:foldl(fun({xmlstreamelement, El}, Acc) ->
- Acc ++
- [xml:element_to_string(
- check_default_xmlns(El)
- )];
- ({xmlstreamraw, R}, Acc) ->
- Acc ++ [R]
- end,
- [],
- OutPacket),
-
- Body = "<body xmlns='"++?NS_HTTP_BIND++"'>"
- ++ TypedEls ++
- "</body>",
- ?DEBUG(" --- outgoing data --- ~n~s~n --- END --- ~n",
- [Body]),
- {200, ?HEADER, Body};
- false ->
- case OutPacket of
- [{xmlstreamstart, _, _} | SEls] ->
- OutEls =
- case SEls of
- [{xmlstreamelement,
- {xmlelement,
- "stream:features",
- StreamAttribs, StreamEls}} |
- StreamTail] ->
- TypedTail =
- [check_default_xmlns(OEl) ||
- {xmlstreamelement, OEl} <-
- StreamTail],
- [{xmlelement,
- "stream:features",
- [{"xmlns:stream",
- ?NS_STREAM}] ++
- StreamAttribs, StreamEls}] ++
- TypedTail;
- StreamTail ->
- [check_default_xmlns(OEl) ||
- {xmlstreamelement, OEl} <-
- StreamTail]
- end,
- {200, ?HEADER,
- xml:element_to_binary(
- {xmlelement,"body",
- [{"xmlns",
- ?NS_HTTP_BIND}],
- OutEls})};
+ [] ->
+ {200, ?HEADER,
+ <<"<body xmlns='", (?NS_HTTP_BIND)/binary, "'/>">>};
+ [{xmlstreamend, _}] ->
+ gen_fsm:sync_send_all_state_event(FsmRef,
+ {stop, stream_closed}),
+ {200, ?HEADER,
+ <<"<body xmlns='", (?NS_HTTP_BIND)/binary, "'/>">>};
+ _ ->
+ AllElements = lists:all(fun ({xmlstreamelement,
+ #xmlel{name = <<"stream:error">>}}) ->
+ false;
+ ({xmlstreamelement, _}) -> true;
+ ({xmlstreamraw, _}) -> true;
+ (_) -> false
+ end,
+ OutPacket),
+ case AllElements of
+ true ->
+ TypedEls = lists:foldl(fun ({xmlstreamelement, El},
+ Acc) ->
+ Acc ++
+ [xml:element_to_binary(check_default_xmlns(El))];
+ ({xmlstreamraw, R}, Acc) ->
+ Acc ++ [R]
+ end,
+ [], OutPacket),
+ Body = <<"<body xmlns='", (?NS_HTTP_BIND)/binary, "'>",
+ (iolist_to_binary(TypedEls))/binary, "</body>">>,
+ ?DEBUG(" --- outgoing data --- ~n~s~n --- END "
+ "--- ~n",
+ [Body]),
+ {200, ?HEADER, Body};
+ false ->
+ case OutPacket of
+ [{xmlstreamstart, _, _} | SEls] ->
+ OutEls = case SEls of
+ [{xmlstreamelement,
+ #xmlel{name = <<"stream:features">>,
+ attrs = StreamAttribs,
+ children = StreamEls}}
+ | StreamTail] ->
+ TypedTail = [check_default_xmlns(OEl)
+ || {xmlstreamelement, OEl}
+ <- StreamTail],
+ [#xmlel{name = <<"stream:features">>,
+ attrs =
+ [{<<"xmlns:stream">>,
+ ?NS_STREAM}]
+ ++ StreamAttribs,
+ children = StreamEls}]
+ ++ TypedTail;
+ StreamTail ->
+ [check_default_xmlns(OEl)
+ || {xmlstreamelement, OEl} <- StreamTail]
+ end,
+ {200, ?HEADER,
+ xml:element_to_binary(#xmlel{name = <<"body">>,
+ attrs =
+ [{<<"xmlns">>,
+ ?NS_HTTP_BIND}],
+ children = OutEls})};
+ _ ->
+ SErrCond = lists:filter(fun ({xmlstreamelement,
+ #xmlel{name =
+ <<"stream:error">>}}) ->
+ true;
+ (_) -> false
+ end,
+ OutPacket),
+ StreamErrCond = case SErrCond of
+ [] -> null;
+ [{xmlstreamelement,
+ #xmlel{} = StreamErrorTag}
+ | _] ->
+ [StreamErrorTag]
+ end,
+ gen_fsm:sync_send_all_state_event(FsmRef,
+ {stop,
+ {stream_error,
+ OutPacket}}),
+ case StreamErrCond of
+ null ->
+ {200, ?HEADER,
+ <<"<body type='terminate' condition='internal-se"
+ "rver-error' xmlns='",
+ (?NS_HTTP_BIND)/binary, "'/>">>};
_ ->
- SErrCond =
- lists:filter(
- fun({xmlstreamelement,
- {xmlelement, "stream:error",
- _, _}}) ->
- true;
- (_) -> false
- end, OutPacket),
- StreamErrCond =
- case SErrCond of
- [] ->
- null;
- [{xmlstreamelement,
- {xmlelement, _, _, _Cond} =
- StreamErrorTag} | _] ->
- [StreamErrorTag]
- end,
- gen_fsm:sync_send_all_state_event(FsmRef,
- {stop, {stream_error,OutPacket}}),
- case StreamErrCond of
- null ->
- {200, ?HEADER,
- "<body type='terminate' "
- "condition='internal-server-error' "
- "xmlns='"++?NS_HTTP_BIND++"'/>"};
- _ ->
- {200, ?HEADER,
- "<body type='terminate' "
- "condition='remote-stream-error' "
- "xmlns='"++?NS_HTTP_BIND++"' " ++
- "xmlns:stream='"++?NS_STREAM++"'>" ++
- elements_to_string(StreamErrCond) ++
- "</body>"}
- end
- end
- end
+ {200, ?HEADER,
+ <<"<body type='terminate' condition='remote-stre"
+ "am-error' xmlns='",
+ (?NS_HTTP_BIND)/binary, "' ", "xmlns:stream='",
+ (?NS_STREAM)/binary, "'>",
+ (elements_to_string(StreamErrCond))/binary,
+ "</body>">>}
+ end
+ end
+ end
end.
parse_request(Data, PayloadSize, MaxStanzaSize) ->
- ?DEBUG("--- incoming data --- ~n~s~n --- END --- ", [Data]),
- %% MR: I do not think it works if put put several elements in the
- %% same body:
+ ?DEBUG("--- incoming data --- ~n~s~n --- END "
+ "--- ",
+ [Data]),
case xml_stream:parse_element(Data) of
- {xmlelement, "body", Attrs, Els} ->
- Xmlns = xml:get_attr_s("xmlns",Attrs),
- if
- Xmlns /= ?NS_HTTP_BIND ->
- {error, bad_request};
- true ->
- case catch list_to_integer(xml:get_attr_s("rid", Attrs)) of
- {'EXIT', _} ->
- {error, bad_request};
- Rid ->
- %% I guess this is to remove XMLCDATA: Is it really needed ?
- FixedEls =
- lists:filter(
- fun(I) ->
- case I of
- {xmlelement, _, _, _} ->
- true;
- _ ->
- false
- end
- end, Els),
- Sid = xml:get_attr_s("sid",Attrs),
- if
- PayloadSize =< MaxStanzaSize ->
- {ok, {Sid, Rid, Attrs, FixedEls}};
- true ->
- {size_limit, Sid}
- end
- end
- end;
- {xmlelement, _Name, _Attrs, _Els} ->
- {error, bad_request};
- {error, _Reason} ->
- {error, bad_request}
+ #xmlel{name = <<"body">>, attrs = Attrs,
+ children = Els} ->
+ Xmlns = xml:get_attr_s(<<"xmlns">>, Attrs),
+ if Xmlns /= (?NS_HTTP_BIND) -> {error, bad_request};
+ true ->
+ case catch
+ jlib:binary_to_integer(xml:get_attr_s(<<"rid">>,
+ Attrs))
+ of
+ {'EXIT', _} -> {error, bad_request};
+ Rid ->
+ FixedEls = lists:filter(fun (I) ->
+ case I of
+ #xmlel{} -> true;
+ _ -> false
+ end
+ end,
+ Els),
+ Sid = xml:get_attr_s(<<"sid">>, Attrs),
+ if PayloadSize =< MaxStanzaSize ->
+ {ok, {Sid, Rid, Attrs, FixedEls}};
+ true -> {size_limit, Sid}
+ end
+ end
+ end;
+ #xmlel{} -> {error, bad_request};
+ {error, _Reason} -> {error, bad_request}
end.
-send_receiver_reply(undefined, _Reply) ->
- ok;
+send_receiver_reply(undefined, _Reply) -> ok;
send_receiver_reply(Receiver, Reply) ->
gen_fsm:reply(Receiver, Reply).
-
%% Cancel timer and empty message queue.
-cancel_timer(undefined) ->
- ok;
+cancel_timer(undefined) -> ok;
cancel_timer(Timer) ->
erlang:cancel_timer(Timer),
- receive
- {timeout, Timer, _} ->
- ok
- after 0 ->
- ok
- end.
+ receive {timeout, Timer, _} -> ok after 0 -> ok end.
%% If client asked for a pause (pause > 0), we apply the pause value
%% as inactivity timer:
-set_inactivity_timer(Pause, _MaxInactivity) when Pause > 0 ->
- erlang:start_timer(Pause*1000, self(), []);
+set_inactivity_timer(Pause, _MaxInactivity)
+ when Pause > 0 ->
+ erlang:start_timer(Pause * 1000, self(), []);
%% Otherwise, we apply the max_inactivity value as inactivity timer:
set_inactivity_timer(_Pause, MaxInactivity) ->
erlang:start_timer(MaxInactivity, self(), []).
-
%% TODO: Use tail recursion and list reverse ?
-elements_to_string([]) ->
- [];
+elements_to_string([]) -> [];
elements_to_string([El | Els]) ->
- [xml:element_to_binary(El)|elements_to_string(Els)].
+ [xml:element_to_binary(El) | elements_to_string(Els)].
%% @spec (To, Default::integer()) -> integer()
%% where To = [] | {Host::string(), Version::string()}
get_max_inactivity({Host, _}, Default) ->
- case gen_mod:get_module_opt(Host, mod_http_bind, max_inactivity, undefined) of
- Seconds when is_integer(Seconds) ->
- Seconds * 1000;
- undefined ->
- Default
+ case gen_mod:get_module_opt(Host, mod_http_bind, max_inactivity,
+ fun(I) when is_integer(I), I>0 -> I end,
+ undefined)
+ of
+ Seconds when is_integer(Seconds) -> Seconds * 1000;
+ undefined -> Default
end;
-get_max_inactivity(_, Default) ->
- Default.
+get_max_inactivity(_, Default) -> Default.
get_max_pause({Host, _}) ->
- gen_mod:get_module_opt(Host, mod_http_bind, max_pause, ?MAX_PAUSE);
-get_max_pause(_) ->
- ?MAX_PAUSE.
+ gen_mod:get_module_opt(Host, mod_http_bind, max_pause,
+ fun(I) when is_integer(I), I>0 -> I end,
+ ?MAX_PAUSE);
+get_max_pause(_) -> ?MAX_PAUSE.
%% Current time as integer
tnow() ->
{TMegSec, TSec, TMSec} = now(),
(TMegSec * 1000000 + TSec) * 1000000 + TMSec.
-check_default_xmlns({xmlelement, Name, Attrs, Els} = El) ->
- case xml:get_tag_attr_s("xmlns", El) of
- "" -> {xmlelement, Name, [{"xmlns", ?NS_CLIENT} | Attrs], Els};
- _ -> El
+check_default_xmlns(#xmlel{name = Name, attrs = Attrs,
+ children = Els} =
+ El) ->
+ case xml:get_tag_attr_s(<<"xmlns">>, El) of
+ <<"">> ->
+ #xmlel{name = Name,
+ attrs = [{<<"xmlns">>, ?NS_CLIENT} | Attrs],
+ children = Els};
+ _ -> El
end;
-check_default_xmlns(El) ->
- El.
+check_default_xmlns(El) -> El.
%% Check that mod_http_bind has been defined in config file.
%% Print a warning in log file if this is not the case.
check_bind_module(XmppDomain) ->
case gen_mod:is_loaded(XmppDomain, mod_http_bind) of
- true -> ok;
- false -> ?ERROR_MSG("You are trying to use BOSH (HTTP Bind) in host ~p,"
- " but the module mod_http_bind is not started in"
- " that host. Configure your BOSH client to connect"
- " to the correct host, or add your desired host to"
- " the configuration, or check your 'modules'"
- " section in your ejabberd configuration file.",
- [XmppDomain])
+ true -> true;
+ false ->
+ ?ERROR_MSG("You are trying to use BOSH (HTTP Bind) "
+ "in host ~p, but the module mod_http_bind "
+ "is not started in that host. Configure "
+ "your BOSH client to connect to the correct "
+ "host, or add your desired host to the "
+ "configuration, or check your 'modules' "
+ "section in your ejabberd configuration "
+ "file.",
+ [XmppDomain]),
+ false
end.