diff options
author | Jordan Bracco <href@random.sh> | 2020-05-08 20:10:33 +0200 |
---|---|---|
committer | Jordan Bracco <href@random.sh> | 2020-05-08 20:10:33 +0200 |
commit | b541ef96741129f5aebd2b7526bbca8717cf5c40 (patch) | |
tree | 2cad5e33c2ef3175adeac264453ac22e8b0ed126 /lib | |
parent | Initial commit (diff) |
Switch to atomics, add shared ets, ..
Diffstat (limited to 'lib')
-rw-r--r-- | lib/limiter.ex | 155 |
1 files changed, 133 insertions, 22 deletions
diff --git a/lib/limiter.ex b/lib/limiter.ex index 84c601c..f47e68f 100644 --- a/lib/limiter.ex +++ b/lib/limiter.ex @@ -1,47 +1,158 @@ defmodule Limiter do - @ets __MODULE__.ETS + require Logger - def new(name, max_running, max_waiting) do + @moduledoc """ + # Limiter + + A concurrency limiter. Limits the number of concurrent invocations possible, without using a worker pool or different processes. + + It supports two storage methods: + + * **`[atomics](https://erlang.org/doc/man/atomics.html)`** recommended and default if your OTP is > 21.2. + * **`[ets](https://erlang.org/doc/man/ets.html)`** either with a single table per Limiter (faster) or a shared table (better for a large number of limiters). + + You would however always want to use atomics, ets is mostly there for backwards compatibility. + """ + + @doc """ + Initializes a `Limiter`. + """ + + @spec new(name, max_running, max_waiting, options) :: :ok | {:error, :existing} + when name: atom(), + max_running: non_neg_integer(), + max_waiting: non_neg_integer() | :infinity, + options: [option], + option: {:wait, non_neg_integer()} | backend, + backend: :atomics | ets_backend, + ets_backend: :ets | {:ets, atom()} | {:ets, ets_name :: atom(), ets_options :: []} + def new(name, max_running, max_waiting, options \\ []) do + name = atom_name(name) + + if defined?(name) do + {:error, :existing} + else + wait = Keyword.get(options, :wait, 150) + backend = Keyword.get(options, :backend, default_backend()) + {:ok, backend} = setup_backend(backend) + :persistent_term.put(name, {__MODULE__, max_running, max_waiting, backend, wait}) + :ok + end + end + + @spec set(name, new_max_running, new_max_waiting, options) :: :ok | :error + when name: atom(), + new_max_running: non_neg_integer(), + new_max_waiting: non_neg_integer() | :infinity, + options: [option], + option: {:wait, non_neg_integer()} + @doc "Adjust the limiter limits at runtime" + def set(name, new_max_running, new_max_waiting, options \\ []) do name = atom_name(name) - :persistent_term.put(name, {max_running, max_waiting}) - :ets.new(name, [:public, :named_table]) - :ok + + if defined?(name) do + new_wait = Keyword.get(options, :wait) + {__MODULE__, max_running, max_waiting, backend, wait} = :persistent_term.get(name) + + new = + {__MODULE__, new_max_running || max_running, new_max_waiting || max_waiting, backend, + new_wait || wait} + + :persistent_term.put(name, new) + :ok + else + :error + end end + @spec limit(atom(), function()) :: {:error, :overload} | any() + @doc "Limits invocation of `fun`." def limit(name, fun) do - {max_running, max_waiting} = :persistent_term.get(atom_name(name)) + do_limit(atom_name(name), fun) + end + + defp do_limit(name, fun) do + {__MODULE__, max_running, max_waiting, backend, wait} = :persistent_term.get(name) max = max_running + max_waiting - counter = inc(name) + counter = inc(backend, name) cond do counter <= max_running -> - fun.() + try do + fun.() + after + dec(backend, name) + end counter > max -> + dec(backend, name) {:error, :overload} counter > max_running -> - wait(name, fun) + wait(backend, name, wait, fun) end - after - dec(name) end - defp wait(name, fun) do - Process.sleep(150) - dec(name) - limit(name, fun) + defp wait(backend, name, wait, fun) do + Process.sleep(wait) + dec(backend, name) + do_limit(name, fun) end - defp inc(name) do - name = atom_name(name) - :ets.update_counter(name, name, {2, 1}, {name, 0}) + defp inc({:ets, ets}, name) do + :ets.update_counter(ets, name, {2, 1}, {name, 0}) end - def dec(name) do - name = atom_name(name) - :ets.update_counter(name, name, {2, -1}, {name, 0}) + defp inc({:atomics, ref}, _) do + :atomics.add_get(ref, 1, 1) end - defp atom_name(suffix), do: Module.concat(@ets, suffix) + def dec({:ets, ets}, name) do + :ets.update_counter(ets, name, {2, -1}, {name, 0}) + end + + def dec({:atomics, ref}, _) do + :atomics.sub_get(ref, 1, 1) + end + + defp atom_name(suffix), do: Module.concat(__MODULE__, suffix) + + defp defined?(name) do + {__MODULE__, _, _, _, _, _} = :persistent_term.get(name) + true + rescue + _ -> false + end + + defp default_backend() do + if Code.ensure_loaded?(:atomics) do + :atomics + else + Logger.debug("Limiter: atomics not available, using ETS backend") + :ets + end + end + + defp setup_backend(:ets) do + setup_backend({:ets, ETS}) + end + + defp setup_backend({:ets, name}) do + setup_backend({:ets, name, [{:write_concurrency, true}, {:read_concurrency, true}]}) + end + + defp setup_backend({:ets, name, options}) do + ets_name = atom_name(name) + + case :ets.whereis(ets_name) do + :undefined -> :ets.new(ets_name, [:public, :named_table] ++ options) + _ -> nil + end + + {:ok, {:ets, ets_name}} + end + + defp setup_backend(:atomics) do + {:ok, {:atomics, :atomics.new(1, signed: true)}} + end end |