From fb0bcf9e8d362be4f8b683df881be9225e1950fa Mon Sep 17 00:00:00 2001 From: Jordan Bracco Date: Thu, 14 May 2020 22:38:29 +0200 Subject: Remove ETS backend, add max_retries, add options to limit/3. --- lib/concurrent_limiter.ex | 104 +++++++++++++++------------------------------- 1 file changed, 34 insertions(+), 70 deletions(-) diff --git a/lib/concurrent_limiter.ex b/lib/concurrent_limiter.ex index 9776377..b4572c6 100644 --- a/lib/concurrent_limiter.ex +++ b/lib/concurrent_limiter.ex @@ -6,36 +6,32 @@ defmodule ConcurrentLimiter do A concurrency limiter. Limits the number of concurrent invocations possible, without using a worker pool or different processes. - It can be useful in cases where you don't need a worker pool but still being able to limit concurrent calls without much overhead. As it internally uses `persistent_term` to store metadata, and can fallback to ETS tables, it is however not made for a large number of limiters and cannot be used for things like a per-user rate limiter. - - It supports two storage methods: - - * **[atomics](https://erlang.org/doc/man/atomics.html)** recommended and default if your OTP is > 21.2. - * **[ets](https://erlang.org/doc/man/ets.html)** either with a single table per limiter (faster) or a shared table. - - You would almost always want to use atomics, ets is mostly there for backwards compatibility. + It can be useful in cases where you don't need a worker pool but still being able to limit concurrent calls without much overhead. + As it internally uses `persistent_term` to store metadata, and can fallback to ETS tables, it is however not made for a large number + of limiters and cannot be used for things like a per-user rate limiter. """ @doc """ Initializes a `ConcurrentLimiter`. """ + @default_wait 150 + @default_max_retries 5 + @spec new(name, max_running, max_waiting, options) :: :ok | {:error, :existing} when name: atom(), max_running: non_neg_integer(), max_waiting: non_neg_integer() | :infinity, options: [option], - option: {:wait, non_neg_integer()} | backend, - backend: :atomics | ets_backend, - ets_backend: :ets | {:ets, atom()} | {:ets, atom(), ets_options :: []} + option: {:wait, non_neg_integer()} | {:max_retries, non_neg_integer()} def new(name, max_running, max_waiting, options \\ []) do name = prefix_name(name) if defined?(name) do {:error, :existing} else - wait = Keyword.get(options, :wait, 150) - backend = Keyword.get(options, :backend, default_backend()) - {:ok, backend} = setup_backend(backend) - :persistent_term.put(name, {__MODULE__, max_running, max_waiting, backend, wait}) + wait = Keyword.get(options, :wait, @default_wait) + max_retries = Keyword.get(options, :max_retries, @default_max_retries) + atomics = :atomics.new(1, [signed: true]) + :persistent_term.put(name, {__MODULE__, max_running, max_waiting, atomics, wait, max_retries}) :ok end end @@ -50,8 +46,9 @@ defmodule ConcurrentLimiter do name = prefix_name(name) if defined?(name) do new_wait = Keyword.get(options, :wait) - {__MODULE__, max_running, max_waiting, backend, wait} = :persistent_term.get(name) - new = {__MODULE__, new_max_running || max_running, new_max_waiting || max_waiting, backend, new_wait || wait} + new_max_retries = Keyword.get(options, :max_retries) + {__MODULE__, max_running, max_waiting, ref, wait, max_retries} = :persistent_term.get(name) + new = {__MODULE__, new_max_running || max_running, new_max_waiting || max_waiting, ref, new_wait || wait, new_max_retries || max_retries} :persistent_term.put(name, new) :ok else @@ -59,53 +56,51 @@ defmodule ConcurrentLimiter do end end - @spec limit(atom(), function()) :: {:error, :overload} | any() + @spec limit(atom(), function(), opts) :: {:error, :overload} | any() when opts: [option], + option: {:wait, non_neg_integer()} | {:max_retries, non_neg_integer()} @doc "Limits invocation of `fun`." - def limit(name, fun) do - do_limit(prefix_name(name), fun) + def limit(name, fun, opts \\ []) do + do_limit(prefix_name(name), fun, opts, 0) end - defp do_limit(name, fun) do - {__MODULE__, max_running, max_waiting, backend, wait} = :persistent_term.get(name) + defp do_limit(name, fun, opts, retries) do + {__MODULE__, max_running, max_waiting, ref, wait, max_retries} = :persistent_term.get(name) max = max_running + max_waiting - counter = inc(backend, name) + counter = inc(ref, name) + max_retries = Keyword.get(opts, :max_retries) || max_retries cond do counter <= max_running -> try do fun.() after - dec(backend, name) + dec(ref, name) end counter > max -> - dec(backend, name) + dec(ref, name) + {:error, :overload} + + retries + 1 > max_retries -> {:error, :overload} counter > max_running -> - wait(backend, name, wait, fun) + wait(ref, name, wait, fun, opts, max_retries, retries + 1) end end - defp wait(backend, name, wait, fun) do + defp wait(ref, name, wait, fun, opts, max_retries, retries) do + wait = Keyword.get(opts, :timeout) || wait Process.sleep(wait) - dec(backend, name) - do_limit(name, fun) + dec(ref, name) + do_limit(name, fun, opts, retries) end - defp inc({:ets, ets}, name) do - :ets.update_counter(ets, name, {2, 1}, {name, 0}) - end - - defp inc({:atomics, ref}, _) do + defp inc(ref, _) do :atomics.add_get(ref, 1, 1) end - defp dec({:ets, ets}, name) do - :ets.update_counter(ets, name, {2, -1}, {name, 0}) - end - - defp dec({:atomics, ref}, _) do + defp dec(ref, _) do :atomics.sub_get(ref, 1, 1) end @@ -118,35 +113,4 @@ defmodule ConcurrentLimiter do _ -> false end - defp default_backend() do - if Code.ensure_loaded?(:atomics) do - :atomics - else - Logger.debug("ConcurrentLimiter: atomics not available, using ETS backend") - :ets - end - end - - defp setup_backend(:ets) do - setup_backend({:ets, ETS}) - end - - defp setup_backend({:ets, name}) do - setup_backend({:ets, name, [{:write_concurrency, true}, {:read_concurrency, true}]}) - end - - defp setup_backend({:ets, name, options}) do - ets_name = prefix_name(name) - - case :ets.whereis(ets_name) do - :undefined -> :ets.new(ets_name, [:public, :named_table] ++ options) - _ -> nil - end - {:ok, {:ets, ets_name}} - end - - defp setup_backend(:atomics) do - {:ok, {:atomics, :atomics.new(1, [signed: true])}} - end - end -- cgit v1.2.3