diff options
Diffstat (limited to 'src/ejabberd_sql.erl')
-rw-r--r-- | src/ejabberd_sql.erl | 75 |
1 files changed, 35 insertions, 40 deletions
diff --git a/src/ejabberd_sql.erl b/src/ejabberd_sql.erl index 51ff9d436..616b6b73a 100644 --- a/src/ejabberd_sql.erl +++ b/src/ejabberd_sql.erl @@ -75,8 +75,7 @@ db_version = undefined :: undefined | non_neg_integer(), start_interval = 0 :: non_neg_integer(), host = <<"">> :: binary(), - max_pending_requests_len :: non_neg_integer(), - pending_requests = {0, queue:new()} :: {non_neg_integer(), ?TQUEUE}}). + pending_requests :: p1_queue:queue()}). -define(STATE_KEY, ejabberd_sql_state). @@ -271,10 +270,16 @@ init([Host, StartInterval]) -> [DBType | _] = db_opts(Host), (?GEN_FSM):send_event(self(), connect), ejabberd_sql_sup:add_pid(Host, self()), + QueueType = case ejabberd_config:get_option( + {sql_queue_type, Host}, opt_type(sql_queue_type)) of + undefined -> + ejabberd_config:default_queue_type(Host); + Type -> + Type + end, {ok, connecting, #state{db_type = DBType, host = Host, - max_pending_requests_len = max_fsm_queue(), - pending_requests = {0, queue:new()}, + pending_requests = p1_queue:new(QueueType, max_fsm_queue()), start_interval = StartInterval}}. connecting(connect, #state{host = Host} = State) -> @@ -285,16 +290,17 @@ connecting(connect, #state{host = Host} = State) -> [mssql | Args] -> apply(fun odbc_connect/1, Args); [odbc | Args] -> apply(fun odbc_connect/1, Args) end, - {_, PendingRequests} = State#state.pending_requests, case ConnectRes of {ok, Ref} -> erlang:monitor(process, Ref), - lists:foreach(fun (Req) -> - (?GEN_FSM):send_event(self(), Req) - end, - queue:to_list(PendingRequests)), + PendingRequests = + p1_queue:dropwhile( + fun(Req) -> + ?GEN_FSM:send_event(self(), Req), + true + end, State#state.pending_requests), State1 = State#state{db_ref = Ref, - pending_requests = {0, queue:new()}}, + pending_requests = PendingRequests}, State2 = get_db_version(State1), {next_state, session_established, State2}; {error, Reason} -> @@ -321,26 +327,20 @@ connecting({sql_cmd, Command, Timestamp} = Req, From, State) -> ?DEBUG("queuing pending request while connecting:~n\t~p", [Req]), - {Len, PendingRequests} = State#state.pending_requests, - NewPendingRequests = if Len < - State#state.max_pending_requests_len -> - {Len + 1, - queue:in({sql_cmd, Command, From, Timestamp}, - PendingRequests)}; - true -> - lists:foreach(fun ({sql_cmd, _, To, - _Timestamp}) -> - (?GEN_FSM):reply(To, - {error, - <<"SQL connection failed">>}) - end, - queue:to_list(PendingRequests)), - {1, - queue:from_list([{sql_cmd, Command, From, - Timestamp}])} - end, + PendingRequests = + try p1_queue:in({sql_cmd, Command, From, Timestamp}, + State#state.pending_requests) + catch error:full -> + Q = p1_queue:dropwhile( + fun({sql_cmd, _, To, _Timestamp}) -> + (?GEN_FSM):reply( + To, {error, <<"SQL connection failed">>}), + true + end, State#state.pending_requests), + p1_queue:in({sql_cmd, Command, From, Timestamp}, Q) + end, {next_state, connecting, - State#state{pending_requests = NewPendingRequests}}; + State#state{pending_requests = PendingRequests}}; connecting(Request, {Who, _Ref}, State) -> ?WARNING_MSG("unexpected call ~p from ~p in 'connecting'", [Request, Who]), @@ -1068,15 +1068,10 @@ odbcinst_config() -> filename:join(tmp_dir(), "odbcinst.ini"). max_fsm_queue() -> - ejabberd_config:get_option( - max_fsm_queue, - fun(N) when is_integer(N), N > 0 -> N end). + proplists:get_value(max_queue, fsm_limit_opts(), unlimited). fsm_limit_opts() -> - case max_fsm_queue() of - N when is_integer(N) -> [{max_queue, N}]; - _ -> [] - end. + ejabberd_config:fsm_limit_opts([]). check_error({error, Why} = Err, #sql_query{} = Query) -> ?ERROR_MSG("SQL query '~s' at ~p failed: ~p", @@ -1093,8 +1088,6 @@ check_error({error, Why} = Err, Query) -> check_error(Result, _Query) -> Result. -opt_type(max_fsm_queue) -> - fun (N) when is_integer(N), N > 0 -> N end; opt_type(sql_database) -> fun iolist_to_binary/1; opt_type(sql_keepalive_interval) -> fun (I) when is_integer(I), I > 0 -> I end; @@ -1114,8 +1107,10 @@ opt_type(sql_ssl) -> fun(B) when is_boolean(B) -> B end; opt_type(sql_ssl_verify) -> fun(B) when is_boolean(B) -> B end; opt_type(sql_ssl_certfile) -> fun iolist_to_binary/1; opt_type(sql_ssl_cafile) -> fun iolist_to_binary/1; +opt_type(sql_queue_type) -> + fun(ram) -> ram; (file) -> file end; opt_type(_) -> - [max_fsm_queue, sql_database, sql_keepalive_interval, + [sql_database, sql_keepalive_interval, sql_password, sql_port, sql_server, sql_type, sql_username, sql_ssl, sql_ssl_verify, sql_ssl_cerfile, - sql_ssl_cafile]. + sql_ssl_cafile, sql_queue_type]. |