aboutsummaryrefslogtreecommitdiff
path: root/src/ejabberd_receiver.erl
blob: 204771c1a1400cf4c0e33c7091d66ad005d94369 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
%%%----------------------------------------------------------------------
%%% File    : ejabberd_receiver.erl
%%% Author  : Alexey Shchepin <alexey@sevcom.net>
%%% Purpose : Socket receiver for C2S and S2S connections
%%% Created : 10 Nov 2003 by Alexey Shchepin <alexey@sevcom.net>
%%% Id      : $Id$
%%%----------------------------------------------------------------------

-module(ejabberd_receiver).
-author('alexey@sevcom.net').
-vsn('$Revision$ ').

-export([start/3,
	 receiver/4,
	 change_shaper/2,
	 reset_stream/1,
	 starttls/2]).

-include("ejabberd.hrl").


start(Socket, SockMod, Shaper) ->
    proc_lib:spawn(?MODULE, receiver, [Socket, SockMod, Shaper, self()]).


receiver(Socket, SockMod, Shaper, C2SPid) ->
    XMLStreamState = xml_stream:new(C2SPid),
    ShaperState = shaper:new(Shaper),
    Timeout = case SockMod of
		  ssl ->
		      20;
		  _ ->
		      infinity
	      end,
    receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamState, Timeout).

receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamState, Timeout) ->
    Res = (catch SockMod:recv(Socket, 0, Timeout)),
    receive
	{starttls, TLSSocket} ->
	    xml_stream:close(XMLStreamState),
	    XMLStreamState1 = xml_stream:new(C2SPid),
	    TLSRes = case Res of
			 {ok, Data} ->
			     tls:recv_data(TLSSocket, Data);
			 _ ->
			     tls:recv_data(TLSSocket, "")
		     end,
	    receiver1(TLSSocket, tls,
		      ShaperState, C2SPid, XMLStreamState1, Timeout,
		      TLSRes);
	{change_timeout, NewTimeout} -> % Dirty hack
	    receiver1(Socket, SockMod,
		      ShaperState, C2SPid, XMLStreamState, NewTimeout,
		      Res)
    after 0 ->
	    receiver1(Socket, SockMod,
		      ShaperState, C2SPid, XMLStreamState, Timeout,
		      Res)
    end.


receiver1(Socket, SockMod, ShaperState, C2SPid, XMLStreamState, Timeout, Res) ->
    case Res of
        {ok, Text} ->
	    ShaperSt1 = receive
			    {change_shaper, Shaper} ->
				shaper:new(Shaper)
			after 0 ->
				ShaperState
			end,
	    NewShaperState = shaper:update(ShaperSt1, size(Text)),
	    XMLStreamState1 = receive
				  reset_stream ->
				      xml_stream:close(XMLStreamState),
				      xml_stream:new(C2SPid)
			      after 0 ->
				      XMLStreamState
			      end,
	    XMLStreamState2 = xml_stream:parse(XMLStreamState1, Text),
	    receiver(Socket, SockMod, NewShaperState, C2SPid, XMLStreamState2,
		     Timeout);
	{error, timeout} ->
	    receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamState,
		     Timeout);
        {error, Reason} ->
	    xml_stream:close(XMLStreamState),
	    gen_fsm:send_event(C2SPid, closed),
	    ok;
	{'EXIT', Reason} ->
	    ?ERROR_MSG("(~w) abnormal ~w:recv termination:~n\t~p~n",
		       [Socket, SockMod, Reason]),
	    xml_stream:close(XMLStreamState),
	    gen_fsm:send_event(C2SPid, closed),
	    ok
    end.


change_shaper(Pid, Shaper) ->
    Pid ! {change_shaper, Shaper}.

reset_stream(Pid) ->
    Pid ! reset_stream.

starttls(Pid, TLSSocket) ->
    Pid ! {starttls, TLSSocket}.