aboutsummaryrefslogtreecommitdiff
path: root/src/mod_proxy65/mod_proxy65_stream.erl
diff options
context:
space:
mode:
authorAlexey Shchepin <alexey@process-one.net>2006-10-28 02:04:55 +0000
committerAlexey Shchepin <alexey@process-one.net>2006-10-28 02:04:55 +0000
commit399d170a784aa6be19f183a4b2cc40620a8f6caa (patch)
treeba99dd1ff8b260166e912e02e39937151f5dadb1 /src/mod_proxy65/mod_proxy65_stream.erl
parent* src/guide.tex: Fixed typos. (diff)
* src/mod_proxy65/: XEP-0065 proxy (thanks to Evgeniy Khramtsov)
* src/Makefile.win32: Likewise * src/Makefile.in: Likewise * src/configure.ac: Likewise * src/jlib.hrl: Likewise * src/ejabberd.hrl: Added the ejabberd URL SVN Revision: 666
Diffstat (limited to '')
-rw-r--r--src/mod_proxy65/mod_proxy65_stream.erl273
1 files changed, 273 insertions, 0 deletions
diff --git a/src/mod_proxy65/mod_proxy65_stream.erl b/src/mod_proxy65/mod_proxy65_stream.erl
new file mode 100644
index 000000000..1f4a75dd8
--- /dev/null
+++ b/src/mod_proxy65/mod_proxy65_stream.erl
@@ -0,0 +1,273 @@
+%%%----------------------------------------------------------------------
+%%% File : mod_proxy65_stream.erl
+%%% Author : Evgeniy Khramtsov <xram@jabber.ru>
+%%% Purpose : Bytestream process.
+%%% Created : 12 Oct 2006 by Evgeniy Khramtsov <xram@jabber.ru>
+%%% Id : $Id$
+%%%----------------------------------------------------------------------
+
+-module(mod_proxy65_stream).
+-author('xram@jabber.ru').
+
+-behaviour(gen_fsm).
+
+%% gen_fsm callbacks.
+-export([
+ init/1,
+ handle_event/3,
+ handle_sync_event/4,
+ code_change/4,
+ handle_info/3,
+ terminate/3
+ ]).
+
+%% gen_fsm states.
+-export([
+ wait_for_init/2,
+ wait_for_auth/2,
+ wait_for_request/2,
+ wait_for_activation/2,
+ stream_established/2
+ ]).
+
+%% API.
+-export([
+ start/2,
+ stop/1,
+ start_link/3,
+ activate/2,
+ relay/3,
+ socket_type/0
+ ]).
+
+-include("mod_proxy65.hrl").
+-include("../ejabberd.hrl").
+
+-define(WAIT_TIMEOUT, 60000). %% 1 minute (is it enough?)
+
+-record(state, {
+ socket, %% TCP socket
+ timer, %% timer reference
+ sha1, %% SHA1 key
+ host, %% virtual host
+ auth_type, %% authentication type: anonymous or plain
+ shaper, %% Shaper name
+ active = false %% Activity flag
+ }).
+
+%% Unused callbacks
+handle_event(_Event, StateName, StateData) ->
+ {next_state, StateName, StateData}.
+code_change(_OldVsn, StateName, StateData, _Extra) ->
+ {ok, StateName, StateData}.
+%%-------------------------------
+
+start({gen_tcp, Socket}, [Host | Opts]) ->
+ Supervisor = gen_mod:get_module_proc(Host, ejabberd_mod_proxy65_sup),
+ supervisor:start_child(Supervisor, [Socket, Host, Opts]).
+
+start_link(Socket, Host, Opts) ->
+ gen_fsm:start_link(?MODULE, [Socket, Host, Opts], []).
+
+init([Socket, Host, Opts]) ->
+ process_flag(trap_exit, true),
+ AuthType = gen_mod:get_opt(auth_type, Opts, anonymous),
+ Shaper = gen_mod:get_opt(shaper, Opts, none),
+ RecvBuf = gen_mod:get_opt(recbuf, Opts, 65535),
+ SendBuf = gen_mod:get_opt(sndbuf, Opts, 65535),
+ TRef = erlang:send_after(?WAIT_TIMEOUT, self(), stop),
+ inet:setopts(Socket, [{active, true}, {recbuf, RecvBuf}, {sndbuf, SendBuf}]),
+ {ok, wait_for_init, #state{host = Host,
+ auth_type = AuthType,
+ socket = Socket,
+ shaper = Shaper,
+ timer = TRef}}.
+
+terminate(_Reason, _StateName, #state{sha1=SHA1,active=Flag}) ->
+ catch mod_proxy65_sm:unregister_stream(SHA1),
+ if Flag==true ->
+ ?INFO_MSG("Bytestream terminated", []);
+ true ->
+ ok
+ end.
+
+%%%------------------------------
+%%% API.
+%%%------------------------------
+socket_type() ->
+ raw.
+
+stop(StreamPid) ->
+ StreamPid ! stop.
+
+activate({P1, J1}, {P2, J2}) ->
+ case catch {gen_fsm:sync_send_all_state_event(P1, get_socket),
+ gen_fsm:sync_send_all_state_event(P2, get_socket)} of
+ {S1, S2} when is_port(S1), is_port(S2) ->
+ P1 ! {activate, P2, S2, J1, J2},
+ P2 ! {activate, P1, S1, J1, J2},
+ JID1 = jlib:jid_to_string(J1),
+ JID2 = jlib:jid_to_string(J2),
+ ?INFO_MSG("(~w:~w) Activated bytestream for ~s -> ~s", [P1, P2, JID1, JID2]),
+ ok;
+ _ ->
+ error
+ end.
+
+%%%-----------------------
+%%% States
+%%%-----------------------
+wait_for_init(Packet, #state{socket=Socket, auth_type=AuthType} = StateData) ->
+ case mod_proxy65_lib:unpack_init_message(Packet) of
+ {ok, AuthMethods} ->
+ Method = select_auth_method(AuthType, AuthMethods),
+ gen_tcp:send(Socket, mod_proxy65_lib:make_init_reply(Method)),
+ case Method of
+ ?AUTH_ANONYMOUS ->
+ {next_state, wait_for_request, StateData};
+ ?AUTH_PLAIN ->
+ {next_state, wait_for_auth, StateData};
+ ?AUTH_NO_METHODS ->
+ {stop, normal, StateData}
+ end;
+ error ->
+ {stop, normal, StateData}
+ end.
+
+wait_for_auth(Packet, #state{socket=Socket, host=Host} = StateData) ->
+ case mod_proxy65_lib:unpack_auth_request(Packet) of
+ {User, Pass} ->
+ Result = ejabberd_auth:check_password(User, Host, Pass),
+ gen_tcp:send(Socket, mod_proxy65_lib:make_auth_reply(Result)),
+ case Result of
+ true ->
+ {next_state, wait_for_request, StateData};
+ false ->
+ {stop, normal, StateData}
+ end;
+ _ ->
+ {stop, normal, StateData}
+ end.
+
+wait_for_request(Packet, #state{socket=Socket} = StateData) ->
+ Request = mod_proxy65_lib:unpack_request(Packet),
+ case Request of
+ #s5_request{sha1=SHA1, cmd=connect} ->
+ case catch mod_proxy65_sm:register_stream(SHA1) of
+ {atomic, ok} ->
+ inet:setopts(Socket, [{active, false}]),
+ gen_tcp:send(Socket, mod_proxy65_lib:make_reply()),
+ {next_state, wait_for_activation, StateData#state{sha1=SHA1}};
+ _ ->
+ Err = mod_proxy65_lib:make_error_reply(Request),
+ gen_tcp:send(Socket, Err),
+ {stop, normal, StateData}
+ end;
+ #s5_request{cmd=udp} ->
+ Err = mod_proxy65_lib:make_error_reply(Request, ?ERR_COMMAND_NOT_SUPPORTED),
+ gen_tcp:send(Socket, Err),
+ {stop, normal, StateData};
+ _ ->
+ {stop, normal, StateData}
+ end.
+
+wait_for_activation(_Data, StateData) ->
+ {next_state, wait_for_activation, StateData}.
+
+stream_established(_Data, StateData) ->
+ {next_state, stream_established, StateData}.
+
+%%%-----------------------
+%%% Callbacks processing
+%%%-----------------------
+
+%% SOCKS5 packets.
+handle_info({tcp, _S, Data}, StateName, StateData)
+ when StateName /= wait_for_activation ->
+ erlang:cancel_timer(StateData#state.timer),
+ TRef = erlang:send_after(?WAIT_TIMEOUT, self(), stop),
+ gen_fsm:send_event(self(), Data),
+ {next_state, StateName, StateData#state{timer=TRef}};
+
+%% Activation message.
+handle_info({activate, PeerPid, PeerSocket, IJid, TJid},
+ wait_for_activation, StateData) ->
+ erlang:monitor(process, PeerPid),
+ erlang:cancel_timer(StateData#state.timer),
+ MySocket = StateData#state.socket,
+ Shaper = StateData#state.shaper,
+ Host = StateData#state.host,
+ MaxRate = find_maxrate(Shaper, IJid, TJid, Host),
+ spawn_link(?MODULE, relay, [MySocket, PeerSocket, MaxRate]),
+ {next_state, stream_established, StateData#state{active=true}};
+
+%% Socket closed
+handle_info({tcp_closed, _Socket}, _StateName, StateData) ->
+ {stop, normal, StateData};
+handle_info({tcp_error, _Socket, _Reason}, _StateName, StateData) ->
+ {stop, normal, StateData};
+
+%% Got stop message.
+handle_info(stop, _StateName, StateData) ->
+ {stop, normal, StateData};
+
+%% Either linked process or peer process died.
+handle_info({'EXIT',_,_}, _StateName, StateData) ->
+ {stop, normal, StateData};
+handle_info({'DOWN',_,_,_,_}, _StateName, StateData) ->
+ {stop, normal, StateData};
+
+%% Packets of no interest
+handle_info(_Info, StateName, StateData) ->
+ {next_state, StateName, StateData}.
+
+%% Socket request.
+handle_sync_event(get_socket, _From, wait_for_activation, StateData) ->
+ Socket = StateData#state.socket,
+ {reply, Socket, wait_for_activation, StateData};
+
+handle_sync_event(_Event, _From, StateName, StateData) ->
+ {reply, error, StateName, StateData}.
+
+%%%-------------------------------------------------
+%%% Relay Process.
+%%%-------------------------------------------------
+relay(MySocket, PeerSocket, Shaper) ->
+ case gen_tcp:recv(MySocket, 0) of
+ {ok, Data} ->
+ gen_tcp:send(PeerSocket, Data),
+ {NewShaper, Pause} = shaper:update(Shaper, size(Data)),
+ if
+ Pause > 0 -> timer:sleep(Pause);
+ true -> pass
+ end,
+ relay(MySocket, PeerSocket, NewShaper);
+ _ ->
+ stopped
+ end.
+
+%%%------------------------
+%%% Auxiliary functions
+%%%------------------------
+select_auth_method(plain, AuthMethods) ->
+ case lists:member(?AUTH_PLAIN, AuthMethods) of
+ true -> ?AUTH_PLAIN;
+ false -> ?AUTH_NO_METHODS
+ end;
+
+select_auth_method(anonymous, AuthMethods) ->
+ case lists:member(?AUTH_ANONYMOUS, AuthMethods) of
+ true -> ?AUTH_ANONYMOUS;
+ false -> ?AUTH_NO_METHODS
+ end.
+
+%% Obviously, we must use shaper with maximum rate.
+find_maxrate(Shaper, JID1, JID2, Host) ->
+ MaxRate1 = shaper:new(acl:match_rule(Host, Shaper, JID1)),
+ MaxRate2 = shaper:new(acl:match_rule(Host, Shaper, JID2)),
+ if
+ MaxRate1 == none; MaxRate2 == none ->
+ none;
+ true ->
+ lists:max([MaxRate1, MaxRate2])
+ end.