summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/ejabberd_listener.erl3
-rw-r--r--src/ejabberd_receiver.erl42
-rw-r--r--src/xml_stream.erl31
3 files changed, 52 insertions, 24 deletions
diff --git a/src/ejabberd_listener.erl b/src/ejabberd_listener.erl
index 2ea422c3..6e84b65e 100644
--- a/src/ejabberd_listener.erl
+++ b/src/ejabberd_listener.erl
@@ -70,7 +70,8 @@ init(Port, Module, Opts) ->
{packet, 0},
{active, false},
{reuseaddr, true},
- {nodelay, true} |
+ {nodelay, true},
+ {keepalive, true} |
SockOpts]),
case Res of
{ok, ListenSocket} ->
diff --git a/src/ejabberd_receiver.erl b/src/ejabberd_receiver.erl
index 8217bb7e..1f1897fb 100644
--- a/src/ejabberd_receiver.erl
+++ b/src/ejabberd_receiver.erl
@@ -24,7 +24,7 @@ start(Socket, SockMod, Shaper) ->
receiver(Socket, SockMod, Shaper, C2SPid) ->
- XMLStreamPid = xml_stream:start(self(), C2SPid),
+ XMLStreamState = xml_stream:new(C2SPid),
ShaperState = shaper:new(Shaper),
Timeout = case SockMod of
ssl ->
@@ -32,32 +32,32 @@ receiver(Socket, SockMod, Shaper, C2SPid) ->
_ ->
infinity
end,
- receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamPid, Timeout).
+ receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamState, Timeout).
-receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamPid, Timeout) ->
+receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamState, Timeout) ->
Res = (catch SockMod:recv(Socket, 0, Timeout)),
case Res of
{ok, Data} ->
receive
{starttls, TLSSocket} ->
- exit(XMLStreamPid, closed),
- XMLStreamPid1 = xml_stream:start(self(), C2SPid),
+ xml_stream:close(XMLStreamState),
+ XMLStreamState1 = xml_stream:new(C2SPid),
TLSRes = tls:recv_data(TLSSocket, Data),
receiver1(TLSSocket, tls,
- ShaperState, C2SPid, XMLStreamPid1, Timeout,
+ ShaperState, C2SPid, XMLStreamState1, Timeout,
TLSRes)
after 0 ->
receiver1(Socket, SockMod,
- ShaperState, C2SPid, XMLStreamPid, Timeout,
+ ShaperState, C2SPid, XMLStreamState, Timeout,
Res)
end;
_ ->
receiver1(Socket, SockMod,
- ShaperState, C2SPid, XMLStreamPid, Timeout, Res)
+ ShaperState, C2SPid, XMLStreamState, Timeout, Res)
end.
-receiver1(Socket, SockMod, ShaperState, C2SPid, XMLStreamPid, Timeout, Res) ->
+receiver1(Socket, SockMod, ShaperState, C2SPid, XMLStreamState, Timeout, Res) ->
case Res of
{ok, Text} ->
ShaperSt1 = receive
@@ -67,27 +67,27 @@ receiver1(Socket, SockMod, ShaperState, C2SPid, XMLStreamPid, Timeout, Res) ->
ShaperState
end,
NewShaperState = shaper:update(ShaperSt1, size(Text)),
- XMLStreamPid1 = receive
- reset_stream ->
- exit(XMLStreamPid, closed),
- xml_stream:start(self(), C2SPid)
- after 0 ->
- XMLStreamPid
- end,
- xml_stream:send_text(XMLStreamPid1, Text),
- receiver(Socket, SockMod, NewShaperState, C2SPid, XMLStreamPid1,
+ XMLStreamState1 = receive
+ reset_stream ->
+ xml_stream:close(XMLStreamState),
+ xml_stream:new(C2SPid)
+ after 0 ->
+ XMLStreamState
+ end,
+ XMLStreamState2 = xml_stream:parse(XMLStreamState1, Text),
+ receiver(Socket, SockMod, NewShaperState, C2SPid, XMLStreamState2,
Timeout);
{error, timeout} ->
- receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamPid,
+ receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamState,
Timeout);
{error, Reason} ->
- exit(XMLStreamPid, closed),
+ xml_stream:close(XMLStreamState),
gen_fsm:send_event(C2SPid, closed),
ok;
{'EXIT', Reason} ->
?ERROR_MSG("(~w) abnormal ~w:recv termination:~n\t~p~n",
[Socket, SockMod, Reason]),
- exit(XMLStreamPid, closed),
+ xml_stream:close(XMLStreamState),
gen_fsm:send_event(C2SPid, closed),
ok
end.
diff --git a/src/xml_stream.erl b/src/xml_stream.erl
index 8a892bf3..d5fbd937 100644
--- a/src/xml_stream.erl
+++ b/src/xml_stream.erl
@@ -1,7 +1,7 @@
%%%----------------------------------------------------------------------
%%% File : xml_stream.erl
%%% Author : Alexey Shchepin <alexey@sevcom.net>
-%%% Purpose :
+%%% Purpose : Parse XML streams
%%% Created : 17 Nov 2002 by Alexey Shchepin <alexey@sevcom.net>
%%% Id : $Id$
%%%----------------------------------------------------------------------
@@ -10,7 +10,12 @@
-author('alexey@sevcom.net').
-vsn('$Revision$ ').
--export([start/1, start/2, init/1, init/2, send_text/2]).
+-export([start/1, start/2,
+ init/1, init/2,
+ send_text/2,
+ new/1,
+ parse/2,
+ close/1]).
-define(XML_START, 0).
-define(XML_END, 1).
@@ -19,6 +24,8 @@
-define(PARSE_COMMAND, 0).
+-record(xml_stream_state, {callback_pid, port, stack}).
+
start(CallbackPid) ->
spawn(?MODULE, init, [CallbackPid]).
@@ -96,3 +103,23 @@ process_data(CallbackPid, Stack, Data) ->
send_text(Pid, Text) ->
Pid ! {self(), {send, Text}}.
+
+new(CallbackPid) ->
+ Port = open_port({spawn, expat_erl}, [binary]),
+ #xml_stream_state{callback_pid = CallbackPid,
+ port = Port,
+ stack = []}.
+
+
+parse(#xml_stream_state{callback_pid = CallbackPid,
+ port = Port,
+ stack = Stack} = State, Str) ->
+ Res = port_control(Port, ?PARSE_COMMAND, Str),
+ NewStack = lists:foldl(
+ fun(Data, St) ->
+ process_data(CallbackPid, St, Data)
+ end, Stack, binary_to_term(Res)),
+ State#xml_stream_state{stack = NewStack}.
+
+close(#xml_stream_state{port = Port}) ->
+ port_close(Port).