diff options
Diffstat (limited to 'src/mod_proxy65/mod_proxy65_sm.erl')
-rw-r--r-- | src/mod_proxy65/mod_proxy65_sm.erl | 171 |
1 files changed, 82 insertions, 89 deletions
diff --git a/src/mod_proxy65/mod_proxy65_sm.erl b/src/mod_proxy65/mod_proxy65_sm.erl index b5af45ab..fa9d257e 100644 --- a/src/mod_proxy65/mod_proxy65_sm.erl +++ b/src/mod_proxy65/mod_proxy65_sm.erl @@ -25,96 +25,91 @@ %%%---------------------------------------------------------------------- -module(mod_proxy65_sm). + -author('xram@jabber.ru'). -behaviour(gen_server). %% gen_server callbacks. --export([init/1, - handle_info/2, - handle_call/3, - handle_cast/2, - terminate/2, - code_change/3 - ]). +-export([init/1, handle_info/2, handle_call/3, + handle_cast/2, terminate/2, code_change/3]). %% API. --export([ - start_link/2, - register_stream/1, - unregister_stream/1, - activate_stream/4 - ]). - --record(state, {max_connections}). --record(bytestream, { - sha1, %% SHA1 key - target, %% Target Pid - initiator, %% Initiator Pid - active = false, %% Activity flag - jid_i %% Initiator's JID - }). +-export([start_link/2, register_stream/1, + unregister_stream/1, activate_stream/4]). + +-record(state, {max_connections = infinity :: non_neg_integer() | infinity}). + +-include("jlib.hrl"). + +-record(bytestream, + {sha1 = <<"">> :: binary() | '$1', + target :: pid() | '_', + initiator :: pid() | '_', + active = false :: boolean() | '_', + jid_i = {<<"">>, <<"">>, <<"">>} :: ljid() | '_'}). -define(PROCNAME, ejabberd_mod_proxy65_sm). %% Unused callbacks. -handle_cast(_Request, State) -> - {noreply, State}. -code_change(_OldVsn, State, _Extra) -> - {ok, State}. -handle_info(_Info, State) -> - {noreply, State}. +handle_cast(_Request, State) -> {noreply, State}. + +code_change(_OldVsn, State, _Extra) -> {ok, State}. + +handle_info(_Info, State) -> {noreply, State}. + %%---------------- start_link(Host, Opts) -> Proc = gen_mod:get_module_proc(Host, ?PROCNAME), - gen_server:start_link({local, Proc}, ?MODULE, [Opts], []). + gen_server:start_link({local, Proc}, ?MODULE, [Opts], + []). init([Opts]) -> mnesia:create_table(bytestream, [{ram_copies, [node()]}, {attributes, record_info(fields, bytestream)}]), mnesia:add_table_copy(bytestream, node(), ram_copies), - MaxConnections = gen_mod:get_opt(max_connections, Opts, infinity), - {ok, #state{max_connections=MaxConnections}}. + MaxConnections = gen_mod:get_opt(max_connections, Opts, + fun(I) when is_integer(I), I>0 -> + I; + (infinity) -> + infinity + end, infinity), + {ok, #state{max_connections = MaxConnections}}. -terminate(_Reason, _State) -> - ok. +terminate(_Reason, _State) -> ok. handle_call({activate, SHA1, IJid}, _From, State) -> MaxConns = State#state.max_connections, - F = fun() -> + F = fun () -> case mnesia:read(bytestream, SHA1, write) of - [#bytestream{target = TPid, initiator = IPid} = ByteStream] - when is_pid(TPid), is_pid(IPid) -> - ActiveFlag = ByteStream#bytestream.active, - if - ActiveFlag == false -> - ConnsPerJID = - mnesia:select(bytestream, - [{#bytestream{sha1 = '$1', - jid_i = IJid, - _='_'}, - [], - ['$1']}]), - if - length(ConnsPerJID) < MaxConns -> - mnesia:write( - ByteStream#bytestream{active = true, - jid_i = IJid}), - {ok, IPid, TPid}; - true -> - {limit, IPid, TPid} - end; - true -> - conflict - end; - _ -> - false + [#bytestream{target = TPid, initiator = IPid} = + ByteStream] + when is_pid(TPid), is_pid(IPid) -> + ActiveFlag = ByteStream#bytestream.active, + if ActiveFlag == false -> + ConnsPerJID = mnesia:select(bytestream, + [{#bytestream{sha1 = + '$1', + jid_i = + IJid, + _ = '_'}, + [], ['$1']}]), + if length(ConnsPerJID) < MaxConns -> + mnesia:write(ByteStream#bytestream{active = + true, + jid_i = + IJid}), + {ok, IPid, TPid}; + true -> {limit, IPid, TPid} + end; + true -> conflict + end; + _ -> false end end, Reply = mnesia:transaction(F), {reply, Reply, State}; - handle_call(_Request, _From, State) -> {reply, ok, State}. @@ -127,20 +122,19 @@ handle_call(_Request, _From, State) -> %%% transaction abort %%% SHA1 = string() %%%--------------------------------------------------- -register_stream(SHA1) when is_list(SHA1) -> +register_stream(SHA1) when is_binary(SHA1) -> StreamPid = self(), - F = fun() -> + F = fun () -> case mnesia:read(bytestream, SHA1, write) of - [] -> - mnesia:write(#bytestream{sha1 = SHA1, - target = StreamPid}); - [#bytestream{target = Pid, - initiator = undefined} = ByteStream] - when is_pid(Pid), Pid /= StreamPid -> - mnesia:write( - ByteStream#bytestream{initiator = StreamPid}); - _ -> - error + [] -> + mnesia:write(#bytestream{sha1 = SHA1, + target = StreamPid}); + [#bytestream{target = Pid, initiator = undefined} = + ByteStream] + when is_pid(Pid), Pid /= StreamPid -> + mnesia:write(ByteStream#bytestream{initiator = + StreamPid}); + _ -> error end end, mnesia:transaction(F). @@ -149,8 +143,8 @@ register_stream(SHA1) when is_list(SHA1) -> %%% unregister_stream(SHA1) -> ok | transaction abort %%% SHA1 = string() %%%---------------------------------------------------- -unregister_stream(SHA1) when is_list(SHA1) -> - F = fun() -> mnesia:delete({bytestream, SHA1}) end, +unregister_stream(SHA1) when is_binary(SHA1) -> + F = fun () -> mnesia:delete({bytestream, SHA1}) end, mnesia:transaction(F). %%%-------------------------------------------------------- @@ -163,19 +157,18 @@ unregister_stream(SHA1) when is_list(SHA1) -> %%% IJid = TJid = jid() %%% Host = string() %%%-------------------------------------------------------- -activate_stream(SHA1, IJid, TJid, Host) when is_list(SHA1) -> +activate_stream(SHA1, IJid, TJid, Host) + when is_binary(SHA1) -> Proc = gen_mod:get_module_proc(Host, ?PROCNAME), - case catch gen_server:call(Proc, {activate, SHA1, IJid}) of - {atomic, {ok, IPid, TPid}} -> - mod_proxy65_stream:activate({IPid, IJid}, {TPid, TJid}); - {atomic, {limit, IPid, TPid}} -> - mod_proxy65_stream:stop(IPid), - mod_proxy65_stream:stop(TPid), - limit; - {atomic, conflict} -> - conflict; - {atomic, false} -> - false; - _ -> - error + case catch gen_server:call(Proc, {activate, SHA1, IJid}) + of + {atomic, {ok, IPid, TPid}} -> + mod_proxy65_stream:activate({IPid, IJid}, {TPid, TJid}); + {atomic, {limit, IPid, TPid}} -> + mod_proxy65_stream:stop(IPid), + mod_proxy65_stream:stop(TPid), + limit; + {atomic, conflict} -> conflict; + {atomic, false} -> false; + _ -> error end. |