summaryrefslogtreecommitdiff
path: root/src/mod_proxy65
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/mod_proxy65/Makefile.in33
-rw-r--r--src/mod_proxy65/Makefile.win3232
-rw-r--r--src/mod_proxy65/mod_proxy65.erl57
-rw-r--r--src/mod_proxy65/mod_proxy65.hrl46
-rw-r--r--src/mod_proxy65/mod_proxy65_lib.erl69
-rw-r--r--src/mod_proxy65/mod_proxy65_service.erl198
-rw-r--r--src/mod_proxy65/mod_proxy65_sm.erl162
-rw-r--r--src/mod_proxy65/mod_proxy65_stream.erl273
8 files changed, 870 insertions, 0 deletions
diff --git a/src/mod_proxy65/Makefile.in b/src/mod_proxy65/Makefile.in
new file mode 100644
index 00000000..671b1258
--- /dev/null
+++ b/src/mod_proxy65/Makefile.in
@@ -0,0 +1,33 @@
+# $Id$
+
+CC = @CC@
+CFLAGS = @CFLAGS@ @ERLANG_CFLAGS@
+CPPFLAGS = @CPPFLAGS@
+LDFLAGS = @LDFLAGS@
+LIBS = @LIBS@ @ERLANG_LIBS@
+
+SUBDIRS =
+
+OUTDIR = ..
+EFLAGS = -I .. -pz ..
+OBJS = \
+ $(OUTDIR)/mod_proxy65.beam \
+ $(OUTDIR)/mod_proxy65_service.beam \
+ $(OUTDIR)/mod_proxy65_sm.beam \
+ $(OUTDIR)/mod_proxy65_stream.beam \
+ $(OUTDIR)/mod_proxy65_lib.beam
+
+all: $(OBJS)
+
+$(OUTDIR)/%.beam: %.erl
+ @ERLC@ -W $(EFLAGS) -o $(OUTDIR) $<
+
+clean:
+ rm -f $(OBJS)
+
+distclean: clean
+ rm -f Makefile
+
+TAGS:
+ etags *.erl
+
diff --git a/src/mod_proxy65/Makefile.win32 b/src/mod_proxy65/Makefile.win32
new file mode 100644
index 00000000..fdab1329
--- /dev/null
+++ b/src/mod_proxy65/Makefile.win32
@@ -0,0 +1,32 @@
+
+include ..\Makefile.inc
+
+OUTDIR = ..
+EFLAGS = -I .. -pz ..
+
+OBJS = \
+ $(OUTDIR)\mod_proxy65.beam \
+ $(OUTDIR)\mod_proxy65_service.beam \
+ $(OUTDIR)\mod_proxy65_sm.beam \
+ $(OUTDIR)\mod_proxy65_stream.beam \
+ $(OUTDIR)\mod_proxy65_lib.beam
+
+ALL : $(OBJS)
+
+CLEAN :
+ -@erase $(OBJS)
+
+$(OUTDIR)\mod_proxy65.beam : mod_proxy65.erl
+ erlc -W $(EFLAGS) -o $(OUTDIR) mod_proxy65.erl
+
+$(OUTDIR)\mod_proxy65_service.beam : mod_proxy65_service.erl
+ erlc -W $(EFLAGS) -o $(OUTDIR) mod_proxy65_service.erl
+
+$(OUTDIR)\mod_proxy65_sm.beam : mod_proxy65_sm.erl
+ erlc -W $(EFLAGS) -o $(OUTDIR) mod_mod_proxy65_sm.erl
+
+$(OUTDIR)\mod_proxy65_stream.beam : mod_proxy65_stream.erl
+ erlc -W $(EFLAGS) -o $(OUTDIR) mod_proxy65_stream.erl
+
+$(OUTDIR)\mod_proxy65_lib.beam : mod_proxy65_lib.erl
+ erlc -W $(EFLAGS) -o $(OUTDIR) mod_proxy65_lib.erl
diff --git a/src/mod_proxy65/mod_proxy65.erl b/src/mod_proxy65/mod_proxy65.erl
new file mode 100644
index 00000000..65719241
--- /dev/null
+++ b/src/mod_proxy65/mod_proxy65.erl
@@ -0,0 +1,57 @@
+%%%----------------------------------------------------------------------
+%%% File : mod_proxy65.erl
+%%% Author : Evgeniy Khramtsov <xram@jabber.ru>
+%%% Purpose : Main supervisor.
+%%% Created : 12 Oct 2006 by Evgeniy Khramtsov <xram@jabber.ru>
+%%% Id : $Id$
+%%%----------------------------------------------------------------------
+
+-module(mod_proxy65).
+-author('xram@jabber.ru').
+
+-behaviour(gen_mod).
+-behaviour(supervisor).
+
+%% gen_mod callbacks.
+-export([start/2, stop/1]).
+
+%% supervisor callbacks.
+-export([init/1]).
+
+%% API.
+-export([start_link/2]).
+
+-define(PROCNAME, ejabberd_mod_proxy65).
+
+start(Host, Opts) ->
+ Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
+ ChildSpec = {
+ Proc, {?MODULE, start_link, [Host, Opts]},
+ transient, infinity, supervisor, [?MODULE]
+ },
+ supervisor:start_child(ejabberd_sup, ChildSpec).
+
+stop(Host) ->
+ Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
+ supervisor:terminate_child(ejabberd_sup, Proc),
+ supervisor:delete_child(ejabberd_sup, Proc).
+
+start_link(Host, Opts) ->
+ Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
+ supervisor:start_link({local, Proc}, ?MODULE, [Host, Opts]).
+
+init([Host, Opts]) ->
+ Service =
+ {mod_proxy65_service, {mod_proxy65_service, start_link, [Host, Opts]},
+ transient, 5000, worker, [mod_proxy65_service]},
+ StreamSupervisor =
+ {ejabberd_mod_proxy65_sup,
+ {ejabberd_tmp_sup, start_link,
+ [gen_mod:get_module_proc(Host, ejabberd_mod_proxy65_sup),
+ mod_proxy65_stream]},
+ transient, infinity, supervisor, [ejabberd_tmp_sup]},
+ StreamManager =
+ {mod_proxy65_sm, {mod_proxy65_sm, start_link, [Host, Opts]},
+ transient, 5000, worker, [mod_proxy65_sm]},
+ {ok, {{one_for_one, 10, 1},
+ [StreamManager, StreamSupervisor, Service]}}.
diff --git a/src/mod_proxy65/mod_proxy65.hrl b/src/mod_proxy65/mod_proxy65.hrl
new file mode 100644
index 00000000..e0191238
--- /dev/null
+++ b/src/mod_proxy65/mod_proxy65.hrl
@@ -0,0 +1,46 @@
+%%%----------------------------------------------------------------------
+%%% File : mod_proxy65.hrl
+%%% Author : Evgeniy Khramtsov <xram@jabber.ru>
+%%% Purpose : RFC 1928 constants.
+%%% Created : 12 Oct 2006 by Evgeniy Khramtsov <xram@jabber.ru>
+%%% Id : $Id$
+%%%----------------------------------------------------------------------
+
+%% Version
+-define(VERSION_5, 5).
+
+%% Authentication methods
+-define(AUTH_ANONYMOUS, 0).
+-define(AUTH_GSSAPI, 1).
+-define(AUTH_PLAIN, 2).
+-define(AUTH_NO_METHODS, 16#FF).
+
+%% Address Type
+-define(ATYP_IPV4, 1).
+-define(ATYP_DOMAINNAME, 3).
+-define(ATYP_IPV6, 4).
+
+%% Commands
+-define(CMD_CONNECT, 1).
+-define(CMD_BIND, 2).
+-define(CMD_UDP, 3).
+
+%% RFC 1928 replies
+-define(SUCCESS, 0).
+-define(ERR_GENERAL_FAILURE, 0).
+-define(ERR_NOT_ALLOWED, 2).
+-define(ERR_NETWORK_UNREACHABLE, 3).
+-define(ERR_HOST_UNREACHABLE, 4).
+-define(ERR_CONNECTION_REFUSED, 5).
+-define(ERR_TTL_EXPIRED, 6).
+-define(ERR_COMMAND_NOT_SUPPORTED, 7).
+-define(ERR_ADDRESS_TYPE_NOT_SUPPORTED, 8).
+
+%% RFC 1928 defined timeout.
+-define(SOCKS5_REPLY_TIMEOUT, 10000).
+
+-record(s5_request, {
+ rsv = 0,
+ cmd,
+ sha1
+ }).
diff --git a/src/mod_proxy65/mod_proxy65_lib.erl b/src/mod_proxy65/mod_proxy65_lib.erl
new file mode 100644
index 00000000..09ee6b98
--- /dev/null
+++ b/src/mod_proxy65/mod_proxy65_lib.erl
@@ -0,0 +1,69 @@
+%%%----------------------------------------------------------------------
+%%% File : mod_proxy65_lib.erl
+%%% Author : Evgeniy Khramtsov <xram@jabber.ru>
+%%% Purpose : SOCKS5 parsing library.
+%%% Created : 12 Oct 2006 by Evgeniy Khramtsov <xram@jabber.ru>
+%%% Id : $Id$
+%%%----------------------------------------------------------------------
+
+-module(mod_proxy65_lib).
+-author('xram@jabber.ru').
+
+-include("mod_proxy65.hrl").
+
+-export([
+ unpack_init_message/1,
+ unpack_auth_request/1,
+ unpack_request/1,
+ make_init_reply/1,
+ make_auth_reply/1,
+ make_reply/0,
+ make_error_reply/1,
+ make_error_reply/2
+ ]).
+
+unpack_init_message(<<?VERSION_5, N, AuthMethodList:N/binary>>)
+ when N > 0, N < 256 ->
+ {ok, binary_to_list(AuthMethodList)};
+
+unpack_init_message(_) ->
+ error.
+
+unpack_auth_request(<<1, ULen, User:ULen/binary,
+ PLen, Pass:PLen/binary>>) when ULen < 256, PLen < 256 ->
+ {binary_to_list(User), binary_to_list(Pass)};
+
+unpack_auth_request(_) ->
+ error.
+
+unpack_request(<<?VERSION_5, CMD, RSV,
+ ?ATYP_DOMAINNAME, 40,
+ SHA1:40/binary, 0, 0>>) when CMD == ?CMD_CONNECT;
+ CMD == ?CMD_UDP ->
+ Command = if
+ CMD == ?CMD_CONNECT -> connect;
+ CMD == ?CMD_UDP -> udp
+ end,
+ #s5_request{cmd = Command, rsv = RSV, sha1 = binary_to_list(SHA1)};
+
+unpack_request(_) ->
+ error.
+
+make_init_reply(Method) ->
+ [?VERSION_5, Method].
+
+make_auth_reply(true) -> [1, ?SUCCESS];
+make_auth_reply(false) -> [1, ?ERR_NOT_ALLOWED].
+
+%% WARNING: According to SOCKS5 RFC, this reply is _incorrect_, but
+%% Psi writes junk to the beginning of the file on correct reply.
+%% I'm not sure, but there may be an issue with other clients.
+%% Needs more testing.
+make_reply() ->
+ [?VERSION_5, ?SUCCESS, 0, 0, 0, 0].
+
+make_error_reply(Request) ->
+ make_error_reply(Request, ?ERR_NOT_ALLOWED).
+
+make_error_reply(#s5_request{rsv = RSV, sha1 = SHA1}, Reason) ->
+ [?VERSION_5, Reason, RSV, ?ATYP_DOMAINNAME, length(SHA1), SHA1, 0,0].
diff --git a/src/mod_proxy65/mod_proxy65_service.erl b/src/mod_proxy65/mod_proxy65_service.erl
new file mode 100644
index 00000000..175ac4a6
--- /dev/null
+++ b/src/mod_proxy65/mod_proxy65_service.erl
@@ -0,0 +1,198 @@
+%%%----------------------------------------------------------------------
+%%% File : mod_proxy65_service.erl
+%%% Author : Evgeniy Khramtsov <xram@jabber.ru>
+%%% Purpose : SOCKS5 Bytestreams XMPP service.
+%%% Created : 12 Oct 2006 by Evgeniy Khramtsov <xram@jabber.ru>
+%%% Id : $Id$
+%%%----------------------------------------------------------------------
+
+-module(mod_proxy65_service).
+-author('xram@jabber.ru').
+
+-behaviour(gen_server).
+
+%% gen_server callbacks.
+-export([init/1,
+ handle_info/2,
+ handle_call/3,
+ handle_cast/2,
+ terminate/2,
+ code_change/3
+ ]).
+
+%% API.
+-export([start_link/2]).
+
+-include("../ejabberd.hrl").
+-include("../jlib.hrl").
+
+-define(PROCNAME, ejabberd_mod_proxy65_service).
+
+-record(state, {
+ myhost,
+ serverhost,
+ name,
+ stream_addr,
+ port,
+ acl
+ }).
+
+%% Unused callbacks.
+handle_cast(_Request, State) ->
+ {noreply, State}.
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+handle_call(_Request, _From, State) ->
+ {reply, ok, State}.
+%%----------------
+
+start_link(Host, Opts) ->
+ Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
+ gen_server:start_link({local, Proc}, ?MODULE, [Host, Opts], []).
+
+init([Host, Opts]) ->
+ {IP, State} = parse_options(Host, Opts),
+ NewOpts = [Host, {ip, IP} | Opts],
+ ejabberd_listener:add_listener(State#state.port, mod_proxy65_stream, NewOpts),
+ ejabberd_router:register_route(State#state.myhost),
+ {ok, State}.
+
+terminate(_Reason, #state{myhost=MyHost, port=Port}) ->
+ catch ejabberd_listener:delete_listener(Port),
+ ejabberd_router:unregister_route(MyHost),
+ ok.
+
+handle_info({route, From, To, {xmlelement, "iq", _, _} = Packet}, State) ->
+ IQ = jlib:iq_query_info(Packet),
+ case catch process_iq(From, IQ, State) of
+ Result when is_record(Result, iq) ->
+ ejabberd_router:route(To, From, jlib:iq_to_xml(Result));
+ {'EXIT', Reason} ->
+ ?ERROR_MSG("Error when processing IQ stanza: ~p", [Reason]),
+ Err = jlib:make_error_reply(Packet, ?ERR_INTERNAL_SERVER_ERROR),
+ ejabberd_router:route(To, From, Err);
+ _ ->
+ ok
+ end,
+ {noreply, State};
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+%%%------------------------
+%%% IQ Processing
+%%%------------------------
+
+%% disco#info request
+process_iq(_, #iq{type = get, xmlns = ?NS_DISCO_INFO} = IQ, #state{name=Name}) ->
+ IQ#iq{type = result, sub_el =
+ [{xmlelement, "query", [{"xmlns", ?NS_DISCO_INFO}], iq_disco_info(Name)}]};
+
+%% disco#items request
+process_iq(_, #iq{type = get, xmlns = ?NS_DISCO_ITEMS} = IQ, _) ->
+ IQ#iq{type = result, sub_el =
+ [{xmlelement, "query", [{"xmlns", ?NS_DISCO_ITEMS}], []}]};
+
+%% vCard request
+process_iq(_, #iq{type = get, xmlns = ?NS_VCARD, lang = Lang} = IQ, _) ->
+ IQ#iq{type = result, sub_el =
+ [{xmlelement, "vCard", [{"xmlns", ?NS_VCARD}], iq_vcard(Lang)}]};
+
+%% bytestreams info request
+process_iq(JID, #iq{type = get, sub_el = SubEl, xmlns = ?NS_BYTESTREAMS} = IQ,
+ #state{acl = ACL, stream_addr = StreamAddr, serverhost = ServerHost}) ->
+ case acl:match_rule(ServerHost, ACL, JID) of
+ allow ->
+ StreamHostEl = [{xmlelement, "streamhost", StreamAddr, []}],
+ IQ#iq{type = result, sub_el =
+ [{xmlelement, "query", [{"xmlns", ?NS_BYTESTREAMS}], StreamHostEl}]};
+ deny ->
+ IQ#iq{type = error, sub_el = [SubEl, ?ERR_FORBIDDEN]}
+ end;
+
+%% bytestream activation request
+process_iq(InitiatorJID, #iq{type = set, sub_el = SubEl, xmlns = ?NS_BYTESTREAMS} = IQ,
+ #state{acl = ACL, serverhost = ServerHost}) ->
+ case acl:match_rule(ServerHost, ACL, InitiatorJID) of
+ allow ->
+ ActivateEl = xml:get_path_s(SubEl, [{elem, "activate"}]),
+ SID = xml:get_tag_attr_s("sid", SubEl),
+ case catch jlib:string_to_jid(xml:get_tag_cdata(ActivateEl)) of
+ TargetJID when is_record(TargetJID, jid), SID /= "",
+ length(SID) =< 128, TargetJID /= InitiatorJID ->
+ Target = jlib:jid_to_string(jlib:jid_tolower(TargetJID)),
+ Initiator = jlib:jid_to_string(jlib:jid_tolower(InitiatorJID)),
+ SHA1 = sha:sha(SID ++ Initiator ++ Target),
+ case mod_proxy65_sm:activate_stream(SHA1, InitiatorJID, TargetJID, ServerHost) of
+ ok ->
+ IQ#iq{type = result, sub_el = []};
+ false ->
+ IQ#iq{type = error, sub_el = [SubEl, ?ERR_ITEM_NOT_FOUND]};
+ limit ->
+ IQ#iq{type = error, sub_el = [SubEl, ?ERR_RESOURCE_CONSTRAINT]};
+ conflict ->
+ IQ#iq{type = error, sub_el = [SubEl, ?ERR_CONFLICT]};
+ _ ->
+ IQ#iq{type = error, sub_el = [SubEl, ?ERR_INTERNAL_SERVER_ERROR]}
+ end;
+ _ ->
+ IQ#iq{type = error, sub_el = [SubEl, ?ERR_BAD_REQUEST]}
+ end;
+ deny ->
+ IQ#iq{type = error, sub_el = [SubEl, ?ERR_FORBIDDEN]}
+ end;
+
+%% Unknown "set" or "get" request
+process_iq(_, #iq{type=Type, sub_el=SubEl} = IQ, _) when Type==get; Type==set ->
+ IQ#iq{type = error, sub_el = [SubEl, ?ERR_SERVICE_UNAVAILABLE]};
+
+%% IQ "result" or "error".
+process_iq(_, _, _) ->
+ ok.
+
+%%%-------------------------
+%%% Auxiliary functions.
+%%%-------------------------
+-define(FEATURE(Feat), {xmlelement,"feature",[{"var", Feat}],[]}).
+
+iq_disco_info(Name) ->
+ [{xmlelement, "identity",
+ [{"category", "proxy"},
+ {"type", "bytestreams"},
+ {"name", Name}], []},
+ ?FEATURE(?NS_DISCO_INFO),
+ ?FEATURE(?NS_DISCO_ITEMS),
+ ?FEATURE(?NS_VCARD),
+ ?FEATURE(?NS_BYTESTREAMS)].
+
+iq_vcard(Lang) ->
+ [{xmlelement, "FN", [],
+ [{xmlcdata, "ejabberd/mod_proxy65"}]},
+ {xmlelement, "URL", [],
+ [{xmlcdata, ?EJABBERD_URI}]},
+ {xmlelement, "DESC", [],
+ [{xmlcdata, translate:translate(Lang, "ejabberd SOCKS5 Bytestreams module\n"
+ "Copyright (c) 2003-2006 Alexey Shchepin")}]}].
+
+parse_options(ServerHost, Opts) ->
+ MyHost = gen_mod:get_opt(host, Opts, "proxy." ++ ServerHost),
+ Port = gen_mod:get_opt(port, Opts, 7777),
+ ACL = gen_mod:get_opt(access, Opts, all),
+ Name = gen_mod:get_opt(name, Opts, "SOCKS5 Bytestreams"),
+ IP = case gen_mod:get_opt(ip, Opts, none) of
+ none ->
+ case inet:getaddr(MyHost, inet) of
+ {ok, Addr} -> Addr;
+ {error, _} -> {127,0,0,1}
+ end;
+ Addr ->
+ Addr
+ end,
+ [_ | StrIP] = lists:append([[$. | integer_to_list(X)] || X <- inet:ip_to_bytes(IP)]),
+ StreamAddr = [{"jid", MyHost}, {"host", StrIP}, {"port", integer_to_list(Port)}],
+ {IP, #state{myhost = MyHost,
+ serverhost = ServerHost,
+ name = Name,
+ port = Port,
+ stream_addr = StreamAddr,
+ acl = ACL}}.
diff --git a/src/mod_proxy65/mod_proxy65_sm.erl b/src/mod_proxy65/mod_proxy65_sm.erl
new file mode 100644
index 00000000..e799432f
--- /dev/null
+++ b/src/mod_proxy65/mod_proxy65_sm.erl
@@ -0,0 +1,162 @@
+%%%----------------------------------------------------------------------
+%%% File : mod_proxy65_sm.erl
+%%% Author : Evgeniy Khramtsov <xram@jabber.ru>
+%%% Purpose : Bytestreams manager.
+%%% Created : 12 Oct 2006 by Evgeniy Khramtsov <xram@jabber.ru>
+%%% Id : $Id$
+%%%----------------------------------------------------------------------
+
+-module(mod_proxy65_sm).
+-author('xram@jabber.ru').
+
+-behaviour(gen_server).
+
+%% gen_server callbacks.
+-export([init/1,
+ handle_info/2,
+ handle_call/3,
+ handle_cast/2,
+ terminate/2,
+ code_change/3
+ ]).
+
+%% API.
+-export([
+ start_link/2,
+ register_stream/1,
+ unregister_stream/1,
+ activate_stream/4
+ ]).
+
+-record(state, {max_connections}).
+-record(bytestream, {
+ sha1, %% SHA1 key
+ target, %% Target Pid
+ initiator, %% Initiator Pid
+ active = false, %% Activity flag
+ jid_i %% Initiator's JID
+ }).
+
+-define(PROCNAME, ejabberd_mod_proxy65_sm).
+
+%% Unused callbacks.
+handle_cast(_Request, State) ->
+ {noreply, State}.
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+handle_info(_Info, State) ->
+ {noreply, State}.
+%%----------------
+
+start_link(Host, Opts) ->
+ Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
+ gen_server:start_link({local, Proc}, ?MODULE, [Opts], []).
+
+init([Opts]) ->
+ mnesia:create_table(bytestream, [{ram_copies, [node()]},
+ {attributes, record_info(fields, bytestream)}]),
+ MaxConnections = gen_mod:get_opt(max_connections, Opts, infinity),
+ {ok, #state{max_connections=MaxConnections}}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+handle_call({activate, SHA1, IJid}, _From, State) ->
+ MaxConns = State#state.max_connections,
+ F = fun() ->
+ case mnesia:read(bytestream, SHA1, write) of
+ [#bytestream{target = TPid, initiator = IPid} = ByteStream]
+ when is_pid(TPid), is_pid(IPid) ->
+ ActiveFlag = ByteStream#bytestream.active,
+ if
+ ActiveFlag == false ->
+ ConnsPerJID =
+ mnesia:select(bytestream,
+ [{#bytestream{sha1 = '$1',
+ jid_i = IJid,
+ _='_'},
+ [],
+ ['$1']}]),
+ if
+ length(ConnsPerJID) < MaxConns ->
+ mnesia:write(
+ ByteStream#bytestream{active = true,
+ jid_i = IJid}),
+ {ok, IPid, TPid};
+ true ->
+ {limit, IPid, TPid}
+ end;
+ true ->
+ conflict
+ end;
+ _ ->
+ false
+ end
+ end,
+ Reply = mnesia:transaction(F),
+ {reply, Reply, State};
+
+handle_call(_Request, _From, State) ->
+ {reply, ok, State}.
+
+%%%----------------------
+%%% API.
+%%%----------------------
+%%%---------------------------------------------------
+%%% register_stream(SHA1) -> {atomic, ok} |
+%%% {atomic, error} |
+%%% transaction abort
+%%% SHA1 = string()
+%%%---------------------------------------------------
+register_stream(SHA1) when is_list(SHA1) ->
+ StreamPid = self(),
+ F = fun() ->
+ case mnesia:read(bytestream, SHA1, write) of
+ [] ->
+ mnesia:write(#bytestream{sha1 = SHA1,
+ target = StreamPid});
+ [#bytestream{target = Pid,
+ initiator = undefined} = ByteStream]
+ when is_pid(Pid), Pid /= StreamPid ->
+ mnesia:write(
+ ByteStream#bytestream{initiator = StreamPid});
+ _ ->
+ error
+ end
+ end,
+ mnesia:transaction(F).
+
+%%%----------------------------------------------------
+%%% unregister_stream(SHA1) -> ok | transaction abort
+%%% SHA1 = string()
+%%%----------------------------------------------------
+unregister_stream(SHA1) when is_list(SHA1) ->
+ F = fun() -> mnesia:delete({bytestream, SHA1}) end,
+ mnesia:transaction(F).
+
+%%%--------------------------------------------------------
+%%% activate_stream(SHA1, IJid, TJid, Host) -> ok |
+%%% false |
+%%% limit |
+%%% conflict |
+%%% error
+%%% SHA1 = string()
+%%% IJid = TJid = jid()
+%%% Host = string()
+%%%--------------------------------------------------------
+activate_stream(SHA1, IJid, TJid, Host) when is_list(SHA1) ->
+ Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
+ case catch gen_server:call(Proc, {activate, SHA1, IJid}) of
+ {atomic, {ok, IPid, TPid}} ->
+ mod_proxy65_stream:activate({IPid, IJid}, {TPid, TJid});
+ {atomic, {limit, IPid, TPid}} ->
+ mod_proxy65_stream:stop(IPid),
+ mod_proxy65_stream:stop(TPid),
+ limit;
+ {atomic, conflict} ->
+ conflict;
+ {atomic, false} ->
+ false;
+ _ ->
+ error
+ end.
diff --git a/src/mod_proxy65/mod_proxy65_stream.erl b/src/mod_proxy65/mod_proxy65_stream.erl
new file mode 100644
index 00000000..1f4a75dd
--- /dev/null
+++ b/src/mod_proxy65/mod_proxy65_stream.erl
@@ -0,0 +1,273 @@
+%%%----------------------------------------------------------------------
+%%% File : mod_proxy65_stream.erl
+%%% Author : Evgeniy Khramtsov <xram@jabber.ru>
+%%% Purpose : Bytestream process.
+%%% Created : 12 Oct 2006 by Evgeniy Khramtsov <xram@jabber.ru>
+%%% Id : $Id$
+%%%----------------------------------------------------------------------
+
+-module(mod_proxy65_stream).
+-author('xram@jabber.ru').
+
+-behaviour(gen_fsm).
+
+%% gen_fsm callbacks.
+-export([
+ init/1,
+ handle_event/3,
+ handle_sync_event/4,
+ code_change/4,
+ handle_info/3,
+ terminate/3
+ ]).
+
+%% gen_fsm states.
+-export([
+ wait_for_init/2,
+ wait_for_auth/2,
+ wait_for_request/2,
+ wait_for_activation/2,
+ stream_established/2
+ ]).
+
+%% API.
+-export([
+ start/2,
+ stop/1,
+ start_link/3,
+ activate/2,
+ relay/3,
+ socket_type/0
+ ]).
+
+-include("mod_proxy65.hrl").
+-include("../ejabberd.hrl").
+
+-define(WAIT_TIMEOUT, 60000). %% 1 minute (is it enough?)
+
+-record(state, {
+ socket, %% TCP socket
+ timer, %% timer reference
+ sha1, %% SHA1 key
+ host, %% virtual host
+ auth_type, %% authentication type: anonymous or plain
+ shaper, %% Shaper name
+ active = false %% Activity flag
+ }).
+
+%% Unused callbacks
+handle_event(_Event, StateName, StateData) ->
+ {next_state, StateName, StateData}.
+code_change(_OldVsn, StateName, StateData, _Extra) ->
+ {ok, StateName, StateData}.
+%%-------------------------------
+
+start({gen_tcp, Socket}, [Host | Opts]) ->
+ Supervisor = gen_mod:get_module_proc(Host, ejabberd_mod_proxy65_sup),
+ supervisor:start_child(Supervisor, [Socket, Host, Opts]).
+
+start_link(Socket, Host, Opts) ->
+ gen_fsm:start_link(?MODULE, [Socket, Host, Opts], []).
+
+init([Socket, Host, Opts]) ->
+ process_flag(trap_exit, true),
+ AuthType = gen_mod:get_opt(auth_type, Opts, anonymous),
+ Shaper = gen_mod:get_opt(shaper, Opts, none),
+ RecvBuf = gen_mod:get_opt(recbuf, Opts, 65535),
+ SendBuf = gen_mod:get_opt(sndbuf, Opts, 65535),
+ TRef = erlang:send_after(?WAIT_TIMEOUT, self(), stop),
+ inet:setopts(Socket, [{active, true}, {recbuf, RecvBuf}, {sndbuf, SendBuf}]),
+ {ok, wait_for_init, #state{host = Host,
+ auth_type = AuthType,
+ socket = Socket,
+ shaper = Shaper,
+ timer = TRef}}.
+
+terminate(_Reason, _StateName, #state{sha1=SHA1,active=Flag}) ->
+ catch mod_proxy65_sm:unregister_stream(SHA1),
+ if Flag==true ->
+ ?INFO_MSG("Bytestream terminated", []);
+ true ->
+ ok
+ end.
+
+%%%------------------------------
+%%% API.
+%%%------------------------------
+socket_type() ->
+ raw.
+
+stop(StreamPid) ->
+ StreamPid ! stop.
+
+activate({P1, J1}, {P2, J2}) ->
+ case catch {gen_fsm:sync_send_all_state_event(P1, get_socket),
+ gen_fsm:sync_send_all_state_event(P2, get_socket)} of
+ {S1, S2} when is_port(S1), is_port(S2) ->
+ P1 ! {activate, P2, S2, J1, J2},
+ P2 ! {activate, P1, S1, J1, J2},
+ JID1 = jlib:jid_to_string(J1),
+ JID2 = jlib:jid_to_string(J2),
+ ?INFO_MSG("(~w:~w) Activated bytestream for ~s -> ~s", [P1, P2, JID1, JID2]),
+ ok;
+ _ ->
+ error
+ end.
+
+%%%-----------------------
+%%% States
+%%%-----------------------
+wait_for_init(Packet, #state{socket=Socket, auth_type=AuthType} = StateData) ->
+ case mod_proxy65_lib:unpack_init_message(Packet) of
+ {ok, AuthMethods} ->
+ Method = select_auth_method(AuthType, AuthMethods),
+ gen_tcp:send(Socket, mod_proxy65_lib:make_init_reply(Method)),
+ case Method of
+ ?AUTH_ANONYMOUS ->
+ {next_state, wait_for_request, StateData};
+ ?AUTH_PLAIN ->
+ {next_state, wait_for_auth, StateData};
+ ?AUTH_NO_METHODS ->
+ {stop, normal, StateData}
+ end;
+ error ->
+ {stop, normal, StateData}
+ end.
+
+wait_for_auth(Packet, #state{socket=Socket, host=Host} = StateData) ->
+ case mod_proxy65_lib:unpack_auth_request(Packet) of
+ {User, Pass} ->
+ Result = ejabberd_auth:check_password(User, Host, Pass),
+ gen_tcp:send(Socket, mod_proxy65_lib:make_auth_reply(Result)),
+ case Result of
+ true ->
+ {next_state, wait_for_request, StateData};
+ false ->
+ {stop, normal, StateData}
+ end;
+ _ ->
+ {stop, normal, StateData}
+ end.
+
+wait_for_request(Packet, #state{socket=Socket} = StateData) ->
+ Request = mod_proxy65_lib:unpack_request(Packet),
+ case Request of
+ #s5_request{sha1=SHA1, cmd=connect} ->
+ case catch mod_proxy65_sm:register_stream(SHA1) of
+ {atomic, ok} ->
+ inet:setopts(Socket, [{active, false}]),
+ gen_tcp:send(Socket, mod_proxy65_lib:make_reply()),
+ {next_state, wait_for_activation, StateData#state{sha1=SHA1}};
+ _ ->
+ Err = mod_proxy65_lib:make_error_reply(Request),
+ gen_tcp:send(Socket, Err),
+ {stop, normal, StateData}
+ end;
+ #s5_request{cmd=udp} ->
+ Err = mod_proxy65_lib:make_error_reply(Request, ?ERR_COMMAND_NOT_SUPPORTED),
+ gen_tcp:send(Socket, Err),
+ {stop, normal, StateData};
+ _ ->
+ {stop, normal, StateData}
+ end.
+
+wait_for_activation(_Data, StateData) ->
+ {next_state, wait_for_activation, StateData}.
+
+stream_established(_Data, StateData) ->
+ {next_state, stream_established, StateData}.
+
+%%%-----------------------
+%%% Callbacks processing
+%%%-----------------------
+
+%% SOCKS5 packets.
+handle_info({tcp, _S, Data}, StateName, StateData)
+ when StateName /= wait_for_activation ->
+ erlang:cancel_timer(StateData#state.timer),
+ TRef = erlang:send_after(?WAIT_TIMEOUT, self(), stop),
+ gen_fsm:send_event(self(), Data),
+ {next_state, StateName, StateData#state{timer=TRef}};
+
+%% Activation message.
+handle_info({activate, PeerPid, PeerSocket, IJid, TJid},
+ wait_for_activation, StateData) ->
+ erlang:monitor(process, PeerPid),
+ erlang:cancel_timer(StateData#state.timer),
+ MySocket = StateData#state.socket,
+ Shaper = StateData#state.shaper,
+ Host = StateData#state.host,
+ MaxRate = find_maxrate(Shaper, IJid, TJid, Host),
+ spawn_link(?MODULE, relay, [MySocket, PeerSocket, MaxRate]),
+ {next_state, stream_established, StateData#state{active=true}};
+
+%% Socket closed
+handle_info({tcp_closed, _Socket}, _StateName, StateData) ->
+ {stop, normal, StateData};
+handle_info({tcp_error, _Socket, _Reason}, _StateName, StateData) ->
+ {stop, normal, StateData};
+
+%% Got stop message.
+handle_info(stop, _StateName, StateData) ->
+ {stop, normal, StateData};
+
+%% Either linked process or peer process died.
+handle_info({'EXIT',_,_}, _StateName, StateData) ->
+ {stop, normal, StateData};
+handle_info({'DOWN',_,_,_,_}, _StateName, StateData) ->
+ {stop, normal, StateData};
+
+%% Packets of no interest
+handle_info(_Info, StateName, StateData) ->
+ {next_state, StateName, StateData}.
+
+%% Socket request.
+handle_sync_event(get_socket, _From, wait_for_activation, StateData) ->
+ Socket = StateData#state.socket,
+ {reply, Socket, wait_for_activation, StateData};
+
+handle_sync_event(_Event, _From, StateName, StateData) ->
+ {reply, error, StateName, StateData}.
+
+%%%-------------------------------------------------
+%%% Relay Process.
+%%%-------------------------------------------------
+relay(MySocket, PeerSocket, Shaper) ->
+ case gen_tcp:recv(MySocket, 0) of
+ {ok, Data} ->
+ gen_tcp:send(PeerSocket, Data),
+ {NewShaper, Pause} = shaper:update(Shaper, size(Data)),
+ if
+ Pause > 0 -> timer:sleep(Pause);
+ true -> pass
+ end,
+ relay(MySocket, PeerSocket, NewShaper);
+ _ ->
+ stopped
+ end.
+
+%%%------------------------
+%%% Auxiliary functions
+%%%------------------------
+select_auth_method(plain, AuthMethods) ->
+ case lists:member(?AUTH_PLAIN, AuthMethods) of
+ true -> ?AUTH_PLAIN;
+ false -> ?AUTH_NO_METHODS
+ end;
+
+select_auth_method(anonymous, AuthMethods) ->
+ case lists:member(?AUTH_ANONYMOUS, AuthMethods) of
+ true -> ?AUTH_ANONYMOUS;
+ false -> ?AUTH_NO_METHODS
+ end.
+
+%% Obviously, we must use shaper with maximum rate.
+find_maxrate(Shaper, JID1, JID2, Host) ->
+ MaxRate1 = shaper:new(acl:match_rule(Host, Shaper, JID1)),
+ MaxRate2 = shaper:new(acl:match_rule(Host, Shaper, JID2)),
+ if
+ MaxRate1 == none; MaxRate2 == none ->
+ none;
+ true ->
+ lists:max([MaxRate1, MaxRate2])
+ end.