aboutsummaryrefslogtreecommitdiff
path: root/src/ejabberd_sql.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/ejabberd_sql.erl')
-rw-r--r--src/ejabberd_sql.erl75
1 files changed, 35 insertions, 40 deletions
diff --git a/src/ejabberd_sql.erl b/src/ejabberd_sql.erl
index 51ff9d436..616b6b73a 100644
--- a/src/ejabberd_sql.erl
+++ b/src/ejabberd_sql.erl
@@ -75,8 +75,7 @@
db_version = undefined :: undefined | non_neg_integer(),
start_interval = 0 :: non_neg_integer(),
host = <<"">> :: binary(),
- max_pending_requests_len :: non_neg_integer(),
- pending_requests = {0, queue:new()} :: {non_neg_integer(), ?TQUEUE}}).
+ pending_requests :: p1_queue:queue()}).
-define(STATE_KEY, ejabberd_sql_state).
@@ -271,10 +270,16 @@ init([Host, StartInterval]) ->
[DBType | _] = db_opts(Host),
(?GEN_FSM):send_event(self(), connect),
ejabberd_sql_sup:add_pid(Host, self()),
+ QueueType = case ejabberd_config:get_option(
+ {sql_queue_type, Host}, opt_type(sql_queue_type)) of
+ undefined ->
+ ejabberd_config:default_queue_type(Host);
+ Type ->
+ Type
+ end,
{ok, connecting,
#state{db_type = DBType, host = Host,
- max_pending_requests_len = max_fsm_queue(),
- pending_requests = {0, queue:new()},
+ pending_requests = p1_queue:new(QueueType, max_fsm_queue()),
start_interval = StartInterval}}.
connecting(connect, #state{host = Host} = State) ->
@@ -285,16 +290,17 @@ connecting(connect, #state{host = Host} = State) ->
[mssql | Args] -> apply(fun odbc_connect/1, Args);
[odbc | Args] -> apply(fun odbc_connect/1, Args)
end,
- {_, PendingRequests} = State#state.pending_requests,
case ConnectRes of
{ok, Ref} ->
erlang:monitor(process, Ref),
- lists:foreach(fun (Req) ->
- (?GEN_FSM):send_event(self(), Req)
- end,
- queue:to_list(PendingRequests)),
+ PendingRequests =
+ p1_queue:dropwhile(
+ fun(Req) ->
+ ?GEN_FSM:send_event(self(), Req),
+ true
+ end, State#state.pending_requests),
State1 = State#state{db_ref = Ref,
- pending_requests = {0, queue:new()}},
+ pending_requests = PendingRequests},
State2 = get_db_version(State1),
{next_state, session_established, State2};
{error, Reason} ->
@@ -321,26 +327,20 @@ connecting({sql_cmd, Command, Timestamp} = Req, From,
State) ->
?DEBUG("queuing pending request while connecting:~n\t~p",
[Req]),
- {Len, PendingRequests} = State#state.pending_requests,
- NewPendingRequests = if Len <
- State#state.max_pending_requests_len ->
- {Len + 1,
- queue:in({sql_cmd, Command, From, Timestamp},
- PendingRequests)};
- true ->
- lists:foreach(fun ({sql_cmd, _, To,
- _Timestamp}) ->
- (?GEN_FSM):reply(To,
- {error,
- <<"SQL connection failed">>})
- end,
- queue:to_list(PendingRequests)),
- {1,
- queue:from_list([{sql_cmd, Command, From,
- Timestamp}])}
- end,
+ PendingRequests =
+ try p1_queue:in({sql_cmd, Command, From, Timestamp},
+ State#state.pending_requests)
+ catch error:full ->
+ Q = p1_queue:dropwhile(
+ fun({sql_cmd, _, To, _Timestamp}) ->
+ (?GEN_FSM):reply(
+ To, {error, <<"SQL connection failed">>}),
+ true
+ end, State#state.pending_requests),
+ p1_queue:in({sql_cmd, Command, From, Timestamp}, Q)
+ end,
{next_state, connecting,
- State#state{pending_requests = NewPendingRequests}};
+ State#state{pending_requests = PendingRequests}};
connecting(Request, {Who, _Ref}, State) ->
?WARNING_MSG("unexpected call ~p from ~p in 'connecting'",
[Request, Who]),
@@ -1068,15 +1068,10 @@ odbcinst_config() ->
filename:join(tmp_dir(), "odbcinst.ini").
max_fsm_queue() ->
- ejabberd_config:get_option(
- max_fsm_queue,
- fun(N) when is_integer(N), N > 0 -> N end).
+ proplists:get_value(max_queue, fsm_limit_opts(), unlimited).
fsm_limit_opts() ->
- case max_fsm_queue() of
- N when is_integer(N) -> [{max_queue, N}];
- _ -> []
- end.
+ ejabberd_config:fsm_limit_opts([]).
check_error({error, Why} = Err, #sql_query{} = Query) ->
?ERROR_MSG("SQL query '~s' at ~p failed: ~p",
@@ -1093,8 +1088,6 @@ check_error({error, Why} = Err, Query) ->
check_error(Result, _Query) ->
Result.
-opt_type(max_fsm_queue) ->
- fun (N) when is_integer(N), N > 0 -> N end;
opt_type(sql_database) -> fun iolist_to_binary/1;
opt_type(sql_keepalive_interval) ->
fun (I) when is_integer(I), I > 0 -> I end;
@@ -1114,8 +1107,10 @@ opt_type(sql_ssl) -> fun(B) when is_boolean(B) -> B end;
opt_type(sql_ssl_verify) -> fun(B) when is_boolean(B) -> B end;
opt_type(sql_ssl_certfile) -> fun iolist_to_binary/1;
opt_type(sql_ssl_cafile) -> fun iolist_to_binary/1;
+opt_type(sql_queue_type) ->
+ fun(ram) -> ram; (file) -> file end;
opt_type(_) ->
- [max_fsm_queue, sql_database, sql_keepalive_interval,
+ [sql_database, sql_keepalive_interval,
sql_password, sql_port, sql_server, sql_type,
sql_username, sql_ssl, sql_ssl_verify, sql_ssl_cerfile,
- sql_ssl_cafile].
+ sql_ssl_cafile, sql_queue_type].