diff options
Diffstat (limited to '')
-rw-r--r-- | src/mod_proxy65/Makefile.in | 33 | ||||
-rw-r--r-- | src/mod_proxy65/Makefile.win32 | 32 | ||||
-rw-r--r-- | src/mod_proxy65/mod_proxy65.erl | 57 | ||||
-rw-r--r-- | src/mod_proxy65/mod_proxy65.hrl | 46 | ||||
-rw-r--r-- | src/mod_proxy65/mod_proxy65_lib.erl | 69 | ||||
-rw-r--r-- | src/mod_proxy65/mod_proxy65_service.erl | 198 | ||||
-rw-r--r-- | src/mod_proxy65/mod_proxy65_sm.erl | 162 | ||||
-rw-r--r-- | src/mod_proxy65/mod_proxy65_stream.erl | 273 |
8 files changed, 870 insertions, 0 deletions
diff --git a/src/mod_proxy65/Makefile.in b/src/mod_proxy65/Makefile.in new file mode 100644 index 00000000..671b1258 --- /dev/null +++ b/src/mod_proxy65/Makefile.in @@ -0,0 +1,33 @@ +# $Id$ + +CC = @CC@ +CFLAGS = @CFLAGS@ @ERLANG_CFLAGS@ +CPPFLAGS = @CPPFLAGS@ +LDFLAGS = @LDFLAGS@ +LIBS = @LIBS@ @ERLANG_LIBS@ + +SUBDIRS = + +OUTDIR = .. +EFLAGS = -I .. -pz .. +OBJS = \ + $(OUTDIR)/mod_proxy65.beam \ + $(OUTDIR)/mod_proxy65_service.beam \ + $(OUTDIR)/mod_proxy65_sm.beam \ + $(OUTDIR)/mod_proxy65_stream.beam \ + $(OUTDIR)/mod_proxy65_lib.beam + +all: $(OBJS) + +$(OUTDIR)/%.beam: %.erl + @ERLC@ -W $(EFLAGS) -o $(OUTDIR) $< + +clean: + rm -f $(OBJS) + +distclean: clean + rm -f Makefile + +TAGS: + etags *.erl + diff --git a/src/mod_proxy65/Makefile.win32 b/src/mod_proxy65/Makefile.win32 new file mode 100644 index 00000000..fdab1329 --- /dev/null +++ b/src/mod_proxy65/Makefile.win32 @@ -0,0 +1,32 @@ + +include ..\Makefile.inc + +OUTDIR = .. +EFLAGS = -I .. -pz .. + +OBJS = \ + $(OUTDIR)\mod_proxy65.beam \ + $(OUTDIR)\mod_proxy65_service.beam \ + $(OUTDIR)\mod_proxy65_sm.beam \ + $(OUTDIR)\mod_proxy65_stream.beam \ + $(OUTDIR)\mod_proxy65_lib.beam + +ALL : $(OBJS) + +CLEAN : + -@erase $(OBJS) + +$(OUTDIR)\mod_proxy65.beam : mod_proxy65.erl + erlc -W $(EFLAGS) -o $(OUTDIR) mod_proxy65.erl + +$(OUTDIR)\mod_proxy65_service.beam : mod_proxy65_service.erl + erlc -W $(EFLAGS) -o $(OUTDIR) mod_proxy65_service.erl + +$(OUTDIR)\mod_proxy65_sm.beam : mod_proxy65_sm.erl + erlc -W $(EFLAGS) -o $(OUTDIR) mod_mod_proxy65_sm.erl + +$(OUTDIR)\mod_proxy65_stream.beam : mod_proxy65_stream.erl + erlc -W $(EFLAGS) -o $(OUTDIR) mod_proxy65_stream.erl + +$(OUTDIR)\mod_proxy65_lib.beam : mod_proxy65_lib.erl + erlc -W $(EFLAGS) -o $(OUTDIR) mod_proxy65_lib.erl diff --git a/src/mod_proxy65/mod_proxy65.erl b/src/mod_proxy65/mod_proxy65.erl new file mode 100644 index 00000000..65719241 --- /dev/null +++ b/src/mod_proxy65/mod_proxy65.erl @@ -0,0 +1,57 @@ +%%%---------------------------------------------------------------------- +%%% File : mod_proxy65.erl +%%% Author : Evgeniy Khramtsov <xram@jabber.ru> +%%% Purpose : Main supervisor. +%%% Created : 12 Oct 2006 by Evgeniy Khramtsov <xram@jabber.ru> +%%% Id : $Id$ +%%%---------------------------------------------------------------------- + +-module(mod_proxy65). +-author('xram@jabber.ru'). + +-behaviour(gen_mod). +-behaviour(supervisor). + +%% gen_mod callbacks. +-export([start/2, stop/1]). + +%% supervisor callbacks. +-export([init/1]). + +%% API. +-export([start_link/2]). + +-define(PROCNAME, ejabberd_mod_proxy65). + +start(Host, Opts) -> + Proc = gen_mod:get_module_proc(Host, ?PROCNAME), + ChildSpec = { + Proc, {?MODULE, start_link, [Host, Opts]}, + transient, infinity, supervisor, [?MODULE] + }, + supervisor:start_child(ejabberd_sup, ChildSpec). + +stop(Host) -> + Proc = gen_mod:get_module_proc(Host, ?PROCNAME), + supervisor:terminate_child(ejabberd_sup, Proc), + supervisor:delete_child(ejabberd_sup, Proc). + +start_link(Host, Opts) -> + Proc = gen_mod:get_module_proc(Host, ?PROCNAME), + supervisor:start_link({local, Proc}, ?MODULE, [Host, Opts]). + +init([Host, Opts]) -> + Service = + {mod_proxy65_service, {mod_proxy65_service, start_link, [Host, Opts]}, + transient, 5000, worker, [mod_proxy65_service]}, + StreamSupervisor = + {ejabberd_mod_proxy65_sup, + {ejabberd_tmp_sup, start_link, + [gen_mod:get_module_proc(Host, ejabberd_mod_proxy65_sup), + mod_proxy65_stream]}, + transient, infinity, supervisor, [ejabberd_tmp_sup]}, + StreamManager = + {mod_proxy65_sm, {mod_proxy65_sm, start_link, [Host, Opts]}, + transient, 5000, worker, [mod_proxy65_sm]}, + {ok, {{one_for_one, 10, 1}, + [StreamManager, StreamSupervisor, Service]}}. diff --git a/src/mod_proxy65/mod_proxy65.hrl b/src/mod_proxy65/mod_proxy65.hrl new file mode 100644 index 00000000..e0191238 --- /dev/null +++ b/src/mod_proxy65/mod_proxy65.hrl @@ -0,0 +1,46 @@ +%%%---------------------------------------------------------------------- +%%% File : mod_proxy65.hrl +%%% Author : Evgeniy Khramtsov <xram@jabber.ru> +%%% Purpose : RFC 1928 constants. +%%% Created : 12 Oct 2006 by Evgeniy Khramtsov <xram@jabber.ru> +%%% Id : $Id$ +%%%---------------------------------------------------------------------- + +%% Version +-define(VERSION_5, 5). + +%% Authentication methods +-define(AUTH_ANONYMOUS, 0). +-define(AUTH_GSSAPI, 1). +-define(AUTH_PLAIN, 2). +-define(AUTH_NO_METHODS, 16#FF). + +%% Address Type +-define(ATYP_IPV4, 1). +-define(ATYP_DOMAINNAME, 3). +-define(ATYP_IPV6, 4). + +%% Commands +-define(CMD_CONNECT, 1). +-define(CMD_BIND, 2). +-define(CMD_UDP, 3). + +%% RFC 1928 replies +-define(SUCCESS, 0). +-define(ERR_GENERAL_FAILURE, 0). +-define(ERR_NOT_ALLOWED, 2). +-define(ERR_NETWORK_UNREACHABLE, 3). +-define(ERR_HOST_UNREACHABLE, 4). +-define(ERR_CONNECTION_REFUSED, 5). +-define(ERR_TTL_EXPIRED, 6). +-define(ERR_COMMAND_NOT_SUPPORTED, 7). +-define(ERR_ADDRESS_TYPE_NOT_SUPPORTED, 8). + +%% RFC 1928 defined timeout. +-define(SOCKS5_REPLY_TIMEOUT, 10000). + +-record(s5_request, { + rsv = 0, + cmd, + sha1 + }). diff --git a/src/mod_proxy65/mod_proxy65_lib.erl b/src/mod_proxy65/mod_proxy65_lib.erl new file mode 100644 index 00000000..09ee6b98 --- /dev/null +++ b/src/mod_proxy65/mod_proxy65_lib.erl @@ -0,0 +1,69 @@ +%%%---------------------------------------------------------------------- +%%% File : mod_proxy65_lib.erl +%%% Author : Evgeniy Khramtsov <xram@jabber.ru> +%%% Purpose : SOCKS5 parsing library. +%%% Created : 12 Oct 2006 by Evgeniy Khramtsov <xram@jabber.ru> +%%% Id : $Id$ +%%%---------------------------------------------------------------------- + +-module(mod_proxy65_lib). +-author('xram@jabber.ru'). + +-include("mod_proxy65.hrl"). + +-export([ + unpack_init_message/1, + unpack_auth_request/1, + unpack_request/1, + make_init_reply/1, + make_auth_reply/1, + make_reply/0, + make_error_reply/1, + make_error_reply/2 + ]). + +unpack_init_message(<<?VERSION_5, N, AuthMethodList:N/binary>>) + when N > 0, N < 256 -> + {ok, binary_to_list(AuthMethodList)}; + +unpack_init_message(_) -> + error. + +unpack_auth_request(<<1, ULen, User:ULen/binary, + PLen, Pass:PLen/binary>>) when ULen < 256, PLen < 256 -> + {binary_to_list(User), binary_to_list(Pass)}; + +unpack_auth_request(_) -> + error. + +unpack_request(<<?VERSION_5, CMD, RSV, + ?ATYP_DOMAINNAME, 40, + SHA1:40/binary, 0, 0>>) when CMD == ?CMD_CONNECT; + CMD == ?CMD_UDP -> + Command = if + CMD == ?CMD_CONNECT -> connect; + CMD == ?CMD_UDP -> udp + end, + #s5_request{cmd = Command, rsv = RSV, sha1 = binary_to_list(SHA1)}; + +unpack_request(_) -> + error. + +make_init_reply(Method) -> + [?VERSION_5, Method]. + +make_auth_reply(true) -> [1, ?SUCCESS]; +make_auth_reply(false) -> [1, ?ERR_NOT_ALLOWED]. + +%% WARNING: According to SOCKS5 RFC, this reply is _incorrect_, but +%% Psi writes junk to the beginning of the file on correct reply. +%% I'm not sure, but there may be an issue with other clients. +%% Needs more testing. +make_reply() -> + [?VERSION_5, ?SUCCESS, 0, 0, 0, 0]. + +make_error_reply(Request) -> + make_error_reply(Request, ?ERR_NOT_ALLOWED). + +make_error_reply(#s5_request{rsv = RSV, sha1 = SHA1}, Reason) -> + [?VERSION_5, Reason, RSV, ?ATYP_DOMAINNAME, length(SHA1), SHA1, 0,0]. diff --git a/src/mod_proxy65/mod_proxy65_service.erl b/src/mod_proxy65/mod_proxy65_service.erl new file mode 100644 index 00000000..175ac4a6 --- /dev/null +++ b/src/mod_proxy65/mod_proxy65_service.erl @@ -0,0 +1,198 @@ +%%%---------------------------------------------------------------------- +%%% File : mod_proxy65_service.erl +%%% Author : Evgeniy Khramtsov <xram@jabber.ru> +%%% Purpose : SOCKS5 Bytestreams XMPP service. +%%% Created : 12 Oct 2006 by Evgeniy Khramtsov <xram@jabber.ru> +%%% Id : $Id$ +%%%---------------------------------------------------------------------- + +-module(mod_proxy65_service). +-author('xram@jabber.ru'). + +-behaviour(gen_server). + +%% gen_server callbacks. +-export([init/1, + handle_info/2, + handle_call/3, + handle_cast/2, + terminate/2, + code_change/3 + ]). + +%% API. +-export([start_link/2]). + +-include("../ejabberd.hrl"). +-include("../jlib.hrl"). + +-define(PROCNAME, ejabberd_mod_proxy65_service). + +-record(state, { + myhost, + serverhost, + name, + stream_addr, + port, + acl + }). + +%% Unused callbacks. +handle_cast(_Request, State) -> + {noreply, State}. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. +handle_call(_Request, _From, State) -> + {reply, ok, State}. +%%---------------- + +start_link(Host, Opts) -> + Proc = gen_mod:get_module_proc(Host, ?PROCNAME), + gen_server:start_link({local, Proc}, ?MODULE, [Host, Opts], []). + +init([Host, Opts]) -> + {IP, State} = parse_options(Host, Opts), + NewOpts = [Host, {ip, IP} | Opts], + ejabberd_listener:add_listener(State#state.port, mod_proxy65_stream, NewOpts), + ejabberd_router:register_route(State#state.myhost), + {ok, State}. + +terminate(_Reason, #state{myhost=MyHost, port=Port}) -> + catch ejabberd_listener:delete_listener(Port), + ejabberd_router:unregister_route(MyHost), + ok. + +handle_info({route, From, To, {xmlelement, "iq", _, _} = Packet}, State) -> + IQ = jlib:iq_query_info(Packet), + case catch process_iq(From, IQ, State) of + Result when is_record(Result, iq) -> + ejabberd_router:route(To, From, jlib:iq_to_xml(Result)); + {'EXIT', Reason} -> + ?ERROR_MSG("Error when processing IQ stanza: ~p", [Reason]), + Err = jlib:make_error_reply(Packet, ?ERR_INTERNAL_SERVER_ERROR), + ejabberd_router:route(To, From, Err); + _ -> + ok + end, + {noreply, State}; + +handle_info(_Info, State) -> + {noreply, State}. + +%%%------------------------ +%%% IQ Processing +%%%------------------------ + +%% disco#info request +process_iq(_, #iq{type = get, xmlns = ?NS_DISCO_INFO} = IQ, #state{name=Name}) -> + IQ#iq{type = result, sub_el = + [{xmlelement, "query", [{"xmlns", ?NS_DISCO_INFO}], iq_disco_info(Name)}]}; + +%% disco#items request +process_iq(_, #iq{type = get, xmlns = ?NS_DISCO_ITEMS} = IQ, _) -> + IQ#iq{type = result, sub_el = + [{xmlelement, "query", [{"xmlns", ?NS_DISCO_ITEMS}], []}]}; + +%% vCard request +process_iq(_, #iq{type = get, xmlns = ?NS_VCARD, lang = Lang} = IQ, _) -> + IQ#iq{type = result, sub_el = + [{xmlelement, "vCard", [{"xmlns", ?NS_VCARD}], iq_vcard(Lang)}]}; + +%% bytestreams info request +process_iq(JID, #iq{type = get, sub_el = SubEl, xmlns = ?NS_BYTESTREAMS} = IQ, + #state{acl = ACL, stream_addr = StreamAddr, serverhost = ServerHost}) -> + case acl:match_rule(ServerHost, ACL, JID) of + allow -> + StreamHostEl = [{xmlelement, "streamhost", StreamAddr, []}], + IQ#iq{type = result, sub_el = + [{xmlelement, "query", [{"xmlns", ?NS_BYTESTREAMS}], StreamHostEl}]}; + deny -> + IQ#iq{type = error, sub_el = [SubEl, ?ERR_FORBIDDEN]} + end; + +%% bytestream activation request +process_iq(InitiatorJID, #iq{type = set, sub_el = SubEl, xmlns = ?NS_BYTESTREAMS} = IQ, + #state{acl = ACL, serverhost = ServerHost}) -> + case acl:match_rule(ServerHost, ACL, InitiatorJID) of + allow -> + ActivateEl = xml:get_path_s(SubEl, [{elem, "activate"}]), + SID = xml:get_tag_attr_s("sid", SubEl), + case catch jlib:string_to_jid(xml:get_tag_cdata(ActivateEl)) of + TargetJID when is_record(TargetJID, jid), SID /= "", + length(SID) =< 128, TargetJID /= InitiatorJID -> + Target = jlib:jid_to_string(jlib:jid_tolower(TargetJID)), + Initiator = jlib:jid_to_string(jlib:jid_tolower(InitiatorJID)), + SHA1 = sha:sha(SID ++ Initiator ++ Target), + case mod_proxy65_sm:activate_stream(SHA1, InitiatorJID, TargetJID, ServerHost) of + ok -> + IQ#iq{type = result, sub_el = []}; + false -> + IQ#iq{type = error, sub_el = [SubEl, ?ERR_ITEM_NOT_FOUND]}; + limit -> + IQ#iq{type = error, sub_el = [SubEl, ?ERR_RESOURCE_CONSTRAINT]}; + conflict -> + IQ#iq{type = error, sub_el = [SubEl, ?ERR_CONFLICT]}; + _ -> + IQ#iq{type = error, sub_el = [SubEl, ?ERR_INTERNAL_SERVER_ERROR]} + end; + _ -> + IQ#iq{type = error, sub_el = [SubEl, ?ERR_BAD_REQUEST]} + end; + deny -> + IQ#iq{type = error, sub_el = [SubEl, ?ERR_FORBIDDEN]} + end; + +%% Unknown "set" or "get" request +process_iq(_, #iq{type=Type, sub_el=SubEl} = IQ, _) when Type==get; Type==set -> + IQ#iq{type = error, sub_el = [SubEl, ?ERR_SERVICE_UNAVAILABLE]}; + +%% IQ "result" or "error". +process_iq(_, _, _) -> + ok. + +%%%------------------------- +%%% Auxiliary functions. +%%%------------------------- +-define(FEATURE(Feat), {xmlelement,"feature",[{"var", Feat}],[]}). + +iq_disco_info(Name) -> + [{xmlelement, "identity", + [{"category", "proxy"}, + {"type", "bytestreams"}, + {"name", Name}], []}, + ?FEATURE(?NS_DISCO_INFO), + ?FEATURE(?NS_DISCO_ITEMS), + ?FEATURE(?NS_VCARD), + ?FEATURE(?NS_BYTESTREAMS)]. + +iq_vcard(Lang) -> + [{xmlelement, "FN", [], + [{xmlcdata, "ejabberd/mod_proxy65"}]}, + {xmlelement, "URL", [], + [{xmlcdata, ?EJABBERD_URI}]}, + {xmlelement, "DESC", [], + [{xmlcdata, translate:translate(Lang, "ejabberd SOCKS5 Bytestreams module\n" + "Copyright (c) 2003-2006 Alexey Shchepin")}]}]. + +parse_options(ServerHost, Opts) -> + MyHost = gen_mod:get_opt(host, Opts, "proxy." ++ ServerHost), + Port = gen_mod:get_opt(port, Opts, 7777), + ACL = gen_mod:get_opt(access, Opts, all), + Name = gen_mod:get_opt(name, Opts, "SOCKS5 Bytestreams"), + IP = case gen_mod:get_opt(ip, Opts, none) of + none -> + case inet:getaddr(MyHost, inet) of + {ok, Addr} -> Addr; + {error, _} -> {127,0,0,1} + end; + Addr -> + Addr + end, + [_ | StrIP] = lists:append([[$. | integer_to_list(X)] || X <- inet:ip_to_bytes(IP)]), + StreamAddr = [{"jid", MyHost}, {"host", StrIP}, {"port", integer_to_list(Port)}], + {IP, #state{myhost = MyHost, + serverhost = ServerHost, + name = Name, + port = Port, + stream_addr = StreamAddr, + acl = ACL}}. diff --git a/src/mod_proxy65/mod_proxy65_sm.erl b/src/mod_proxy65/mod_proxy65_sm.erl new file mode 100644 index 00000000..e799432f --- /dev/null +++ b/src/mod_proxy65/mod_proxy65_sm.erl @@ -0,0 +1,162 @@ +%%%---------------------------------------------------------------------- +%%% File : mod_proxy65_sm.erl +%%% Author : Evgeniy Khramtsov <xram@jabber.ru> +%%% Purpose : Bytestreams manager. +%%% Created : 12 Oct 2006 by Evgeniy Khramtsov <xram@jabber.ru> +%%% Id : $Id$ +%%%---------------------------------------------------------------------- + +-module(mod_proxy65_sm). +-author('xram@jabber.ru'). + +-behaviour(gen_server). + +%% gen_server callbacks. +-export([init/1, + handle_info/2, + handle_call/3, + handle_cast/2, + terminate/2, + code_change/3 + ]). + +%% API. +-export([ + start_link/2, + register_stream/1, + unregister_stream/1, + activate_stream/4 + ]). + +-record(state, {max_connections}). +-record(bytestream, { + sha1, %% SHA1 key + target, %% Target Pid + initiator, %% Initiator Pid + active = false, %% Activity flag + jid_i %% Initiator's JID + }). + +-define(PROCNAME, ejabberd_mod_proxy65_sm). + +%% Unused callbacks. +handle_cast(_Request, State) -> + {noreply, State}. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. +handle_info(_Info, State) -> + {noreply, State}. +%%---------------- + +start_link(Host, Opts) -> + Proc = gen_mod:get_module_proc(Host, ?PROCNAME), + gen_server:start_link({local, Proc}, ?MODULE, [Opts], []). + +init([Opts]) -> + mnesia:create_table(bytestream, [{ram_copies, [node()]}, + {attributes, record_info(fields, bytestream)}]), + MaxConnections = gen_mod:get_opt(max_connections, Opts, infinity), + {ok, #state{max_connections=MaxConnections}}. + +terminate(_Reason, _State) -> + ok. + +handle_call({activate, SHA1, IJid}, _From, State) -> + MaxConns = State#state.max_connections, + F = fun() -> + case mnesia:read(bytestream, SHA1, write) of + [#bytestream{target = TPid, initiator = IPid} = ByteStream] + when is_pid(TPid), is_pid(IPid) -> + ActiveFlag = ByteStream#bytestream.active, + if + ActiveFlag == false -> + ConnsPerJID = + mnesia:select(bytestream, + [{#bytestream{sha1 = '$1', + jid_i = IJid, + _='_'}, + [], + ['$1']}]), + if + length(ConnsPerJID) < MaxConns -> + mnesia:write( + ByteStream#bytestream{active = true, + jid_i = IJid}), + {ok, IPid, TPid}; + true -> + {limit, IPid, TPid} + end; + true -> + conflict + end; + _ -> + false + end + end, + Reply = mnesia:transaction(F), + {reply, Reply, State}; + +handle_call(_Request, _From, State) -> + {reply, ok, State}. + +%%%---------------------- +%%% API. +%%%---------------------- +%%%--------------------------------------------------- +%%% register_stream(SHA1) -> {atomic, ok} | +%%% {atomic, error} | +%%% transaction abort +%%% SHA1 = string() +%%%--------------------------------------------------- +register_stream(SHA1) when is_list(SHA1) -> + StreamPid = self(), + F = fun() -> + case mnesia:read(bytestream, SHA1, write) of + [] -> + mnesia:write(#bytestream{sha1 = SHA1, + target = StreamPid}); + [#bytestream{target = Pid, + initiator = undefined} = ByteStream] + when is_pid(Pid), Pid /= StreamPid -> + mnesia:write( + ByteStream#bytestream{initiator = StreamPid}); + _ -> + error + end + end, + mnesia:transaction(F). + +%%%---------------------------------------------------- +%%% unregister_stream(SHA1) -> ok | transaction abort +%%% SHA1 = string() +%%%---------------------------------------------------- +unregister_stream(SHA1) when is_list(SHA1) -> + F = fun() -> mnesia:delete({bytestream, SHA1}) end, + mnesia:transaction(F). + +%%%-------------------------------------------------------- +%%% activate_stream(SHA1, IJid, TJid, Host) -> ok | +%%% false | +%%% limit | +%%% conflict | +%%% error +%%% SHA1 = string() +%%% IJid = TJid = jid() +%%% Host = string() +%%%-------------------------------------------------------- +activate_stream(SHA1, IJid, TJid, Host) when is_list(SHA1) -> + Proc = gen_mod:get_module_proc(Host, ?PROCNAME), + case catch gen_server:call(Proc, {activate, SHA1, IJid}) of + {atomic, {ok, IPid, TPid}} -> + mod_proxy65_stream:activate({IPid, IJid}, {TPid, TJid}); + {atomic, {limit, IPid, TPid}} -> + mod_proxy65_stream:stop(IPid), + mod_proxy65_stream:stop(TPid), + limit; + {atomic, conflict} -> + conflict; + {atomic, false} -> + false; + _ -> + error + end. diff --git a/src/mod_proxy65/mod_proxy65_stream.erl b/src/mod_proxy65/mod_proxy65_stream.erl new file mode 100644 index 00000000..1f4a75dd --- /dev/null +++ b/src/mod_proxy65/mod_proxy65_stream.erl @@ -0,0 +1,273 @@ +%%%---------------------------------------------------------------------- +%%% File : mod_proxy65_stream.erl +%%% Author : Evgeniy Khramtsov <xram@jabber.ru> +%%% Purpose : Bytestream process. +%%% Created : 12 Oct 2006 by Evgeniy Khramtsov <xram@jabber.ru> +%%% Id : $Id$ +%%%---------------------------------------------------------------------- + +-module(mod_proxy65_stream). +-author('xram@jabber.ru'). + +-behaviour(gen_fsm). + +%% gen_fsm callbacks. +-export([ + init/1, + handle_event/3, + handle_sync_event/4, + code_change/4, + handle_info/3, + terminate/3 + ]). + +%% gen_fsm states. +-export([ + wait_for_init/2, + wait_for_auth/2, + wait_for_request/2, + wait_for_activation/2, + stream_established/2 + ]). + +%% API. +-export([ + start/2, + stop/1, + start_link/3, + activate/2, + relay/3, + socket_type/0 + ]). + +-include("mod_proxy65.hrl"). +-include("../ejabberd.hrl"). + +-define(WAIT_TIMEOUT, 60000). %% 1 minute (is it enough?) + +-record(state, { + socket, %% TCP socket + timer, %% timer reference + sha1, %% SHA1 key + host, %% virtual host + auth_type, %% authentication type: anonymous or plain + shaper, %% Shaper name + active = false %% Activity flag + }). + +%% Unused callbacks +handle_event(_Event, StateName, StateData) -> + {next_state, StateName, StateData}. +code_change(_OldVsn, StateName, StateData, _Extra) -> + {ok, StateName, StateData}. +%%------------------------------- + +start({gen_tcp, Socket}, [Host | Opts]) -> + Supervisor = gen_mod:get_module_proc(Host, ejabberd_mod_proxy65_sup), + supervisor:start_child(Supervisor, [Socket, Host, Opts]). + +start_link(Socket, Host, Opts) -> + gen_fsm:start_link(?MODULE, [Socket, Host, Opts], []). + +init([Socket, Host, Opts]) -> + process_flag(trap_exit, true), + AuthType = gen_mod:get_opt(auth_type, Opts, anonymous), + Shaper = gen_mod:get_opt(shaper, Opts, none), + RecvBuf = gen_mod:get_opt(recbuf, Opts, 65535), + SendBuf = gen_mod:get_opt(sndbuf, Opts, 65535), + TRef = erlang:send_after(?WAIT_TIMEOUT, self(), stop), + inet:setopts(Socket, [{active, true}, {recbuf, RecvBuf}, {sndbuf, SendBuf}]), + {ok, wait_for_init, #state{host = Host, + auth_type = AuthType, + socket = Socket, + shaper = Shaper, + timer = TRef}}. + +terminate(_Reason, _StateName, #state{sha1=SHA1,active=Flag}) -> + catch mod_proxy65_sm:unregister_stream(SHA1), + if Flag==true -> + ?INFO_MSG("Bytestream terminated", []); + true -> + ok + end. + +%%%------------------------------ +%%% API. +%%%------------------------------ +socket_type() -> + raw. + +stop(StreamPid) -> + StreamPid ! stop. + +activate({P1, J1}, {P2, J2}) -> + case catch {gen_fsm:sync_send_all_state_event(P1, get_socket), + gen_fsm:sync_send_all_state_event(P2, get_socket)} of + {S1, S2} when is_port(S1), is_port(S2) -> + P1 ! {activate, P2, S2, J1, J2}, + P2 ! {activate, P1, S1, J1, J2}, + JID1 = jlib:jid_to_string(J1), + JID2 = jlib:jid_to_string(J2), + ?INFO_MSG("(~w:~w) Activated bytestream for ~s -> ~s", [P1, P2, JID1, JID2]), + ok; + _ -> + error + end. + +%%%----------------------- +%%% States +%%%----------------------- +wait_for_init(Packet, #state{socket=Socket, auth_type=AuthType} = StateData) -> + case mod_proxy65_lib:unpack_init_message(Packet) of + {ok, AuthMethods} -> + Method = select_auth_method(AuthType, AuthMethods), + gen_tcp:send(Socket, mod_proxy65_lib:make_init_reply(Method)), + case Method of + ?AUTH_ANONYMOUS -> + {next_state, wait_for_request, StateData}; + ?AUTH_PLAIN -> + {next_state, wait_for_auth, StateData}; + ?AUTH_NO_METHODS -> + {stop, normal, StateData} + end; + error -> + {stop, normal, StateData} + end. + +wait_for_auth(Packet, #state{socket=Socket, host=Host} = StateData) -> + case mod_proxy65_lib:unpack_auth_request(Packet) of + {User, Pass} -> + Result = ejabberd_auth:check_password(User, Host, Pass), + gen_tcp:send(Socket, mod_proxy65_lib:make_auth_reply(Result)), + case Result of + true -> + {next_state, wait_for_request, StateData}; + false -> + {stop, normal, StateData} + end; + _ -> + {stop, normal, StateData} + end. + +wait_for_request(Packet, #state{socket=Socket} = StateData) -> + Request = mod_proxy65_lib:unpack_request(Packet), + case Request of + #s5_request{sha1=SHA1, cmd=connect} -> + case catch mod_proxy65_sm:register_stream(SHA1) of + {atomic, ok} -> + inet:setopts(Socket, [{active, false}]), + gen_tcp:send(Socket, mod_proxy65_lib:make_reply()), + {next_state, wait_for_activation, StateData#state{sha1=SHA1}}; + _ -> + Err = mod_proxy65_lib:make_error_reply(Request), + gen_tcp:send(Socket, Err), + {stop, normal, StateData} + end; + #s5_request{cmd=udp} -> + Err = mod_proxy65_lib:make_error_reply(Request, ?ERR_COMMAND_NOT_SUPPORTED), + gen_tcp:send(Socket, Err), + {stop, normal, StateData}; + _ -> + {stop, normal, StateData} + end. + +wait_for_activation(_Data, StateData) -> + {next_state, wait_for_activation, StateData}. + +stream_established(_Data, StateData) -> + {next_state, stream_established, StateData}. + +%%%----------------------- +%%% Callbacks processing +%%%----------------------- + +%% SOCKS5 packets. +handle_info({tcp, _S, Data}, StateName, StateData) + when StateName /= wait_for_activation -> + erlang:cancel_timer(StateData#state.timer), + TRef = erlang:send_after(?WAIT_TIMEOUT, self(), stop), + gen_fsm:send_event(self(), Data), + {next_state, StateName, StateData#state{timer=TRef}}; + +%% Activation message. +handle_info({activate, PeerPid, PeerSocket, IJid, TJid}, + wait_for_activation, StateData) -> + erlang:monitor(process, PeerPid), + erlang:cancel_timer(StateData#state.timer), + MySocket = StateData#state.socket, + Shaper = StateData#state.shaper, + Host = StateData#state.host, + MaxRate = find_maxrate(Shaper, IJid, TJid, Host), + spawn_link(?MODULE, relay, [MySocket, PeerSocket, MaxRate]), + {next_state, stream_established, StateData#state{active=true}}; + +%% Socket closed +handle_info({tcp_closed, _Socket}, _StateName, StateData) -> + {stop, normal, StateData}; +handle_info({tcp_error, _Socket, _Reason}, _StateName, StateData) -> + {stop, normal, StateData}; + +%% Got stop message. +handle_info(stop, _StateName, StateData) -> + {stop, normal, StateData}; + +%% Either linked process or peer process died. +handle_info({'EXIT',_,_}, _StateName, StateData) -> + {stop, normal, StateData}; +handle_info({'DOWN',_,_,_,_}, _StateName, StateData) -> + {stop, normal, StateData}; + +%% Packets of no interest +handle_info(_Info, StateName, StateData) -> + {next_state, StateName, StateData}. + +%% Socket request. +handle_sync_event(get_socket, _From, wait_for_activation, StateData) -> + Socket = StateData#state.socket, + {reply, Socket, wait_for_activation, StateData}; + +handle_sync_event(_Event, _From, StateName, StateData) -> + {reply, error, StateName, StateData}. + +%%%------------------------------------------------- +%%% Relay Process. +%%%------------------------------------------------- +relay(MySocket, PeerSocket, Shaper) -> + case gen_tcp:recv(MySocket, 0) of + {ok, Data} -> + gen_tcp:send(PeerSocket, Data), + {NewShaper, Pause} = shaper:update(Shaper, size(Data)), + if + Pause > 0 -> timer:sleep(Pause); + true -> pass + end, + relay(MySocket, PeerSocket, NewShaper); + _ -> + stopped + end. + +%%%------------------------ +%%% Auxiliary functions +%%%------------------------ +select_auth_method(plain, AuthMethods) -> + case lists:member(?AUTH_PLAIN, AuthMethods) of + true -> ?AUTH_PLAIN; + false -> ?AUTH_NO_METHODS + end; + +select_auth_method(anonymous, AuthMethods) -> + case lists:member(?AUTH_ANONYMOUS, AuthMethods) of + true -> ?AUTH_ANONYMOUS; + false -> ?AUTH_NO_METHODS + end. + +%% Obviously, we must use shaper with maximum rate. +find_maxrate(Shaper, JID1, JID2, Host) -> + MaxRate1 = shaper:new(acl:match_rule(Host, Shaper, JID1)), + MaxRate2 = shaper:new(acl:match_rule(Host, Shaper, JID2)), + if + MaxRate1 == none; MaxRate2 == none -> + none; + true -> + lists:max([MaxRate1, MaxRate2]) + end. |