summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJordan Bracco <href@random.sh>2020-05-08 20:10:33 +0200
committerJordan Bracco <href@random.sh>2020-05-08 20:10:33 +0200
commitb541ef96741129f5aebd2b7526bbca8717cf5c40 (patch)
tree2cad5e33c2ef3175adeac264453ac22e8b0ed126
parentInitial commit (diff)
Switch to atomics, add shared ets, ..
-rw-r--r--README.md3
-rw-r--r--lib/limiter.ex155
-rw-r--r--mix.exs5
-rw-r--r--mix.lock5
-rw-r--r--test/limiter_test.exs36
-rw-r--r--test/samples/limiter.exs33
-rw-r--r--test/samples/multi_limiter.exs56
-rw-r--r--test/samples/results_multi_limiter.txt86
-rw-r--r--test/samples/update_counter.exs19
9 files changed, 337 insertions, 61 deletions
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..2e214c3
--- /dev/null
+++ b/README.md
@@ -0,0 +1,3 @@
+# Limiter
+
+See the docs in `lib/limiter.ex`.
diff --git a/lib/limiter.ex b/lib/limiter.ex
index 84c601c..f47e68f 100644
--- a/lib/limiter.ex
+++ b/lib/limiter.ex
@@ -1,47 +1,158 @@
defmodule Limiter do
- @ets __MODULE__.ETS
+ require Logger
- def new(name, max_running, max_waiting) do
+ @moduledoc """
+ # Limiter
+
+ A concurrency limiter. Limits the number of concurrent invocations possible, without using a worker pool or different processes.
+
+ It supports two storage methods:
+
+ * **`[atomics](https://erlang.org/doc/man/atomics.html)`** recommended and default if your OTP is > 21.2.
+ * **`[ets](https://erlang.org/doc/man/ets.html)`** either with a single table per Limiter (faster) or a shared table (better for a large number of limiters).
+
+ You would however always want to use atomics, ets is mostly there for backwards compatibility.
+ """
+
+ @doc """
+ Initializes a `Limiter`.
+ """
+
+ @spec new(name, max_running, max_waiting, options) :: :ok | {:error, :existing}
+ when name: atom(),
+ max_running: non_neg_integer(),
+ max_waiting: non_neg_integer() | :infinity,
+ options: [option],
+ option: {:wait, non_neg_integer()} | backend,
+ backend: :atomics | ets_backend,
+ ets_backend: :ets | {:ets, atom()} | {:ets, ets_name :: atom(), ets_options :: []}
+ def new(name, max_running, max_waiting, options \\ []) do
+ name = atom_name(name)
+
+ if defined?(name) do
+ {:error, :existing}
+ else
+ wait = Keyword.get(options, :wait, 150)
+ backend = Keyword.get(options, :backend, default_backend())
+ {:ok, backend} = setup_backend(backend)
+ :persistent_term.put(name, {__MODULE__, max_running, max_waiting, backend, wait})
+ :ok
+ end
+ end
+
+ @spec set(name, new_max_running, new_max_waiting, options) :: :ok | :error
+ when name: atom(),
+ new_max_running: non_neg_integer(),
+ new_max_waiting: non_neg_integer() | :infinity,
+ options: [option],
+ option: {:wait, non_neg_integer()}
+ @doc "Adjust the limiter limits at runtime"
+ def set(name, new_max_running, new_max_waiting, options \\ []) do
name = atom_name(name)
- :persistent_term.put(name, {max_running, max_waiting})
- :ets.new(name, [:public, :named_table])
- :ok
+
+ if defined?(name) do
+ new_wait = Keyword.get(options, :wait)
+ {__MODULE__, max_running, max_waiting, backend, wait} = :persistent_term.get(name)
+
+ new =
+ {__MODULE__, new_max_running || max_running, new_max_waiting || max_waiting, backend,
+ new_wait || wait}
+
+ :persistent_term.put(name, new)
+ :ok
+ else
+ :error
+ end
end
+ @spec limit(atom(), function()) :: {:error, :overload} | any()
+ @doc "Limits invocation of `fun`."
def limit(name, fun) do
- {max_running, max_waiting} = :persistent_term.get(atom_name(name))
+ do_limit(atom_name(name), fun)
+ end
+
+ defp do_limit(name, fun) do
+ {__MODULE__, max_running, max_waiting, backend, wait} = :persistent_term.get(name)
max = max_running + max_waiting
- counter = inc(name)
+ counter = inc(backend, name)
cond do
counter <= max_running ->
- fun.()
+ try do
+ fun.()
+ after
+ dec(backend, name)
+ end
counter > max ->
+ dec(backend, name)
{:error, :overload}
counter > max_running ->
- wait(name, fun)
+ wait(backend, name, wait, fun)
end
- after
- dec(name)
end
- defp wait(name, fun) do
- Process.sleep(150)
- dec(name)
- limit(name, fun)
+ defp wait(backend, name, wait, fun) do
+ Process.sleep(wait)
+ dec(backend, name)
+ do_limit(name, fun)
end
- defp inc(name) do
- name = atom_name(name)
- :ets.update_counter(name, name, {2, 1}, {name, 0})
+ defp inc({:ets, ets}, name) do
+ :ets.update_counter(ets, name, {2, 1}, {name, 0})
end
- def dec(name) do
- name = atom_name(name)
- :ets.update_counter(name, name, {2, -1}, {name, 0})
+ defp inc({:atomics, ref}, _) do
+ :atomics.add_get(ref, 1, 1)
end
- defp atom_name(suffix), do: Module.concat(@ets, suffix)
+ def dec({:ets, ets}, name) do
+ :ets.update_counter(ets, name, {2, -1}, {name, 0})
+ end
+
+ def dec({:atomics, ref}, _) do
+ :atomics.sub_get(ref, 1, 1)
+ end
+
+ defp atom_name(suffix), do: Module.concat(__MODULE__, suffix)
+
+ defp defined?(name) do
+ {__MODULE__, _, _, _, _, _} = :persistent_term.get(name)
+ true
+ rescue
+ _ -> false
+ end
+
+ defp default_backend() do
+ if Code.ensure_loaded?(:atomics) do
+ :atomics
+ else
+ Logger.debug("Limiter: atomics not available, using ETS backend")
+ :ets
+ end
+ end
+
+ defp setup_backend(:ets) do
+ setup_backend({:ets, ETS})
+ end
+
+ defp setup_backend({:ets, name}) do
+ setup_backend({:ets, name, [{:write_concurrency, true}, {:read_concurrency, true}]})
+ end
+
+ defp setup_backend({:ets, name, options}) do
+ ets_name = atom_name(name)
+
+ case :ets.whereis(ets_name) do
+ :undefined -> :ets.new(ets_name, [:public, :named_table] ++ options)
+ _ -> nil
+ end
+
+ {:ok, {:ets, ets_name}}
+ end
+
+ defp setup_backend(:atomics) do
+ {:ok, {:atomics, :atomics.new(1, signed: true)}}
+ end
end
diff --git a/mix.exs b/mix.exs
index f3ff456..0ff4802 100644
--- a/mix.exs
+++ b/mix.exs
@@ -11,19 +11,16 @@ defmodule Limiter.MixProject do
]
end
- # Run "mix help compile.app" to learn about applications.
def application do
[
extra_applications: [:logger]
]
end
- # Run "mix help deps" to learn about dependencies.
defp deps do
[
+ {:ex_doc, "~> 0.21", only: :dev, runtime: false},
{:benchee, "~> 1.0", only: [:dev, :test]}
- # {:dep_from_hexpm, "~> 0.3.0"},
- # {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"}
]
end
end
diff --git a/mix.lock b/mix.lock
index 5aa28d0..fa32375 100644
--- a/mix.lock
+++ b/mix.lock
@@ -1,4 +1,9 @@
%{
"benchee": {:hex, :benchee, "1.0.1", "66b211f9bfd84bd97e6d1beaddf8fc2312aaabe192f776e8931cb0c16f53a521", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}], "hexpm", "3ad58ae787e9c7c94dd7ceda3b587ec2c64604563e049b2a0e8baafae832addb"},
"deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm", "ce708e5f094b9cd4e8f2be4f00d2f4250c4095be93f8cd6d018c753894885430"},
+ "earmark": {:hex, :earmark, "1.4.4", "4821b8d05cda507189d51f2caeef370cf1e18ca5d7dfb7d31e9cafe6688106a4", [:mix], [], "hexpm", "1f93aba7340574847c0f609da787f0d79efcab51b044bb6e242cae5aca9d264d"},
+ "ex_doc": {:hex, :ex_doc, "0.21.3", "857ec876b35a587c5d9148a2512e952e24c24345552259464b98bfbb883c7b42", [:mix], [{:earmark, "~> 1.4", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "0db1ee8d1547ab4877c5b5dffc6604ef9454e189928d5ba8967d4a58a801f161"},
+ "makeup": {:hex, :makeup, "1.0.1", "82f332e461dc6c79dbd82fbe2a9c10d48ed07146f0a478286e590c83c52010b5", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "49736fe5b66a08d8575bf5321d716bac5da20c8e6b97714fec2bcd6febcfa1f8"},
+ "makeup_elixir": {:hex, :makeup_elixir, "0.14.0", "cf8b7c66ad1cff4c14679698d532f0b5d45a3968ffbcbfd590339cb57742f1ae", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "d4b316c7222a85bbaa2fd7c6e90e37e953257ad196dc229505137c5e505e9eff"},
+ "nimble_parsec": {:hex, :nimble_parsec, "0.5.3", "def21c10a9ed70ce22754fdeea0810dafd53c2db3219a0cd54cf5526377af1c6", [:mix], [], "hexpm", "589b5af56f4afca65217a1f3eb3fee7e79b09c40c742fddc1c312b3ac0b3399f"},
}
diff --git a/test/limiter_test.exs b/test/limiter_test.exs
index c60db9e..1b81ead 100644
--- a/test/limiter_test.exs
+++ b/test/limiter_test.exs
@@ -2,38 +2,20 @@ defmodule LimiterTest do
use ExUnit.Case
doctest Limiter
- defp test_ets(name, max, sleep, fun) do
- count = :ets.update_counter(:limiter_test, name, {2, 1}, {name, 0})
-
- if count <= max do
- fun.({:ok, count})
- Process.sleep(sleep)
- else
- fun.(:fail)
- end
- after
- :ets.update_counter(:limiter_test, name, {2, -1}, {name, 1})
+ test "limiter ets is atomic" do
+ name = "test1"
+ Limiter.new(name, 2, 2)
+ atomic_test(name)
end
- test "limits with ets" do
- :ets.new(:limiter_test, [:public, :named_table])
- ets = "test"
- test = self()
- spawn_link(fn -> test_ets(ets, 2, 500, fn result -> send(test, result) end) end)
- spawn_link(fn -> test_ets(ets, 2, 750, fn result -> send(test, result) end) end)
- spawn_link(fn -> test_ets(ets, 2, 500, fn result -> send(test, result) end) end)
- assert_receive {:ok, 1}
- assert_receive {:ok, 2}
- assert_receive :fail
- Process.sleep(500)
- spawn_link(fn -> test_ets(ets, 2, 500, fn result -> send(test, result) end) end)
- assert_receive {:ok, 2}
+ test "limiter atomics is atomic" do
+ name = "test2"
+ Limiter.new(name, 2, 2, backend: :atomics)
+ atomic_test(name)
end
- test "limiter" do
- name = "test1"
+ defp atomic_test(name) do
self = self()
- Limiter.set(name, 2, 2)
sleepy = fn sleep ->
case Limiter.limit(name, fn ->
diff --git a/test/samples/limiter.exs b/test/samples/limiter.exs
index 896056d..785c85f 100644
--- a/test/samples/limiter.exs
+++ b/test/samples/limiter.exs
@@ -1,11 +1,28 @@
-:ets.new(:limiter_bench, [:public, :named_table])
-Limiter.new(:bench, 1_000_000_000_000_000_000_000_000_000_000_000_000_000_000_000, 0)
+infinite = 1_000_000_000_000_000_000_000_000_000_000_000_000_000_000_000
+Limiter.new(:bench, infinite, 0)
+Limiter.new(:bench_s, infinite, 0, ets: LimiterTest)
-Benchee.run(%{
- "update_counter" => fn ->
- :ets.update_counter(:limiter_bench, "bench", {2, 1}, {"bench", 0})
- end,
- "limit" => fn ->
+concurrent = [{:read_concurrency, true}, {:write_concurrency, true}]
+
+Limiter.new(:bench_rw, infinite, 0)
+Limiter.new(:bench_s_rw, infinite, 0, ets: LimiterTest, ets_opts: concurrent)
+
+single = %{
+ "Limiter.limit/2" => fn ->
Limiter.limit(:bench, fn -> :ok end)
+ end,
+ "Limiter.limit/2 with concurrency" => fn ->
+ Limiter.limit(:bench_rw, fn -> :ok end)
+ end,
+ "Limiter:limit/2 with shared ets" => fn ->
+ Limiter.limit(:bench_s, fn -> :ok end)
+ end,
+ "Limiter:limit/2 with shared ets and concurrency" => fn ->
+ Limiter.limit(:bench_s_rw, fn -> :ok end)
end
-})
+}
+
+IO.puts("\n\n\n\nsingle, sequential\n\n\n\n")
+Benchee.run(single, parallel: 1)
+IO.puts("\n\n\n\nsingle, parallel\n\n\n\n")
+Benchee.run(single, parallel: System.schedulers_online())
diff --git a/test/samples/multi_limiter.exs b/test/samples/multi_limiter.exs
new file mode 100644
index 0000000..abec65d
--- /dev/null
+++ b/test/samples/multi_limiter.exs
@@ -0,0 +1,56 @@
+infinite = 1_000_000_000_000_000_000_000_000_000_000_000_000_000_000_000
+
+Limiter.new(:bench_u_0, infinite, 0, backend: {:ets, LimiterTest0, []})
+Limiter.new(:bench_u_1, infinite, 0, backend: {:ets, LimiterTest1, []})
+Limiter.new(:bench_u_2, infinite, 0, backend: {:ets, LimiterTest2, []})
+Limiter.new(:bench_u_3, infinite, 0, backend: {:ets, LimiterTest3, []})
+
+Limiter.new(:bench_a_0, infinite, 0, backend: :atomics)
+Limiter.new(:bench_a_1, infinite, 0, backend: :atomics)
+Limiter.new(:bench_a_2, infinite, 0, backend: :atomics)
+Limiter.new(:bench_a_3, infinite, 0, backend: :atomics)
+
+Limiter.new(:bench_s_0, infinite, 0, backend: {:ets, LimiterTest, []})
+Limiter.new(:bench_s_1, infinite, 0, backend: {:ets, LimiterTest, []})
+Limiter.new(:bench_s_2, infinite, 0, backend: {:ets, LimiterTest, []})
+Limiter.new(:bench_s_3, infinite, 0, backend: {:ets, LimiterTest, []})
+
+rw = [{:read_concurrency, true}, {:write_concurrency, true}]
+
+Limiter.new(:bench_u_rw0, infinite, 0, backend: {:ets, LimiterTestRW0, rw})
+Limiter.new(:bench_u_rw1, infinite, 0, backend: {:ets, LimiterTestRW1, rw})
+Limiter.new(:bench_u_rw2, infinite, 0, backend: {:ets, LimiterTestRW2, rw})
+Limiter.new(:bench_u_rw3, infinite, 0, backend: {:ets, LimiterTestRW3, rw})
+
+Limiter.new(:bench_s_rw0, infinite, 0, backend: {:ets, LimiterTestRW, rw})
+Limiter.new(:bench_s_rw1, infinite, 0, backend: {:ets, LimiterTestRW, rw})
+Limiter.new(:bench_s_rw2, infinite, 0, backend: {:ets, LimiterTestRW, rw})
+Limiter.new(:bench_s_rw3, infinite, 0, backend: {:ets, LimiterTestRW, rw})
+
+multiple = %{
+ "Limiter.limit/2 unique ets" => fn ->
+ limiter = Enum.random([:bench_u_0, :bench_u_1, :bench_u_2, :bench_u_3])
+ Limiter.limit(limiter, fn -> :ok end)
+ end,
+ "Limiter:limit/2 shared ets" => fn ->
+ limiter = Enum.random([:bench_s_0, :bench_s_1, :bench_s_2, :bench_s_3])
+ Limiter.limit(limiter, fn -> :ok end)
+ end,
+ "Limiter.limit/2 unique ets, concurrency" => fn ->
+ limiter = Enum.random([:bench_u_rw0, :bench_u_rw1, :bench_u_rw2, :bench_u_rw3])
+ Limiter.limit(limiter, fn -> :ok end)
+ end,
+ "Limiter:limit/2 shared ets, concurrency" => fn ->
+ limiter = Enum.random([:bench_s_rw0, :bench_s_rw1, :bench_s_rw2, :bench_s_rw3])
+ Limiter.limit(limiter, fn -> :ok end)
+ end,
+ "Limiter:limit/2 atomics" => fn ->
+ limiter = Enum.random([:bench_a_0, :bench_a_1, :bench_a_2, :bench_a_3])
+ Limiter.limit(limiter, fn -> :ok end)
+ end
+}
+
+IO.puts("\n\n\n\nmulti, sequential\n\n\n\n")
+Benchee.run(multiple)
+IO.puts("\n\n\n\nmulti, parallel\n\n\n\n")
+Benchee.run(multiple, parallel: System.schedulers_online())
diff --git a/test/samples/results_multi_limiter.txt b/test/samples/results_multi_limiter.txt
new file mode 100644
index 0000000..112c325
--- /dev/null
+++ b/test/samples/results_multi_limiter.txt
@@ -0,0 +1,86 @@
+
+
+
+
+multi, sequential
+
+
+
+
+Operating System: Linux
+CPU Information: AMD EPYC 7401P 24-Core Processor
+Number of Available Cores: 8
+Available memory: 31.41 GB
+Elixir 1.10.3
+Erlang 22.3.2
+
+Benchmark suite executing with the following configuration:
+warmup: 2 s
+time: 5 s
+memory time: 0 ns
+parallel: 1
+inputs: none specified
+Estimated total run time: 35 s
+
+Benchmarking Limiter.limit/2 unique ets...
+Benchmarking Limiter.limit/2 unique ets, concurrency...
+Benchmarking Limiter:limit/2 atomics...
+Benchmarking Limiter:limit/2 shared ets...
+Benchmarking Limiter:limit/2 shared ets, concurrency...
+
+Name ips average deviation median 99th %
+Limiter:limit/2 atomics 491.88 K 2.03 μs ±1506.30% 1.55 μs 3.48 μs
+Limiter.limit/2 unique ets 414.63 K 2.41 μs ±1169.34% 1.97 μs 4.53 μs
+Limiter:limit/2 shared ets 411.43 K 2.43 μs ±1286.95% 1.96 μs 3.66 μs
+Limiter.limit/2 unique ets, concurrency 406.50 K 2.46 μs ±1006.31% 2.06 μs 4.34 μs
+Limiter:limit/2 shared ets, concurrency 384.04 K 2.60 μs ±1293.25% 2.12 μs 4.37 μs
+
+Comparison:
+Limiter:limit/2 atomics 491.88 K
+Limiter.limit/2 unique ets 414.63 K - 1.19x slower +0.38 μs
+Limiter:limit/2 shared ets 411.43 K - 1.20x slower +0.40 μs
+Limiter.limit/2 unique ets, concurrency 406.50 K - 1.21x slower +0.43 μs
+Limiter:limit/2 shared ets, concurrency 384.04 K - 1.28x slower +0.57 μs
+
+
+
+
+multi, parallel
+
+
+
+
+Operating System: Linux
+CPU Information: AMD EPYC 7401P 24-Core Processor
+Number of Available Cores: 8
+Available memory: 31.41 GB
+Elixir 1.10.3
+Erlang 22.3.2
+
+Benchmark suite executing with the following configuration:
+warmup: 2 s
+time: 5 s
+memory time: 0 ns
+parallel: 8
+inputs: none specified
+Estimated total run time: 35 s
+
+Benchmarking Limiter.limit/2 unique ets...
+Benchmarking Limiter.limit/2 unique ets, concurrency...
+Benchmarking Limiter:limit/2 atomics...
+Benchmarking Limiter:limit/2 shared ets...
+Benchmarking Limiter:limit/2 shared ets, concurrency...
+
+Name ips average deviation median 99th %
+Limiter:limit/2 atomics 307.84 K 3.25 μs ±1113.62% 2.09 μs 10.24 μs
+Limiter.limit/2 unique ets, concurrency 95.56 K 10.46 μs ±391.37% 2.93 μs 163.02 μs
+Limiter:limit/2 shared ets, concurrency 92.39 K 10.82 μs ±374.36% 2.92 μs 158.97 μs
+Limiter.limit/2 unique ets 80.68 K 12.39 μs ±362.74% 2.85 μs 160.66 μs
+Limiter:limit/2 shared ets 6.04 K 165.66 μs ±17.23% 167.48 μs 237.96 μs
+
+Comparison:
+Limiter:limit/2 atomics 307.84 K
+Limiter.limit/2 unique ets, concurrency 95.56 K - 3.22x slower +7.22 μs
+Limiter:limit/2 shared ets, concurrency 92.39 K - 3.33x slower +7.57 μs
+Limiter.limit/2 unique ets 80.68 K - 3.82x slower +9.15 μs
+Limiter:limit/2 shared ets 6.04 K - 51.00x slower +162.41 μs
diff --git a/test/samples/update_counter.exs b/test/samples/update_counter.exs
new file mode 100644
index 0000000..1768735
--- /dev/null
+++ b/test/samples/update_counter.exs
@@ -0,0 +1,19 @@
+:ets.new(:limiter_bench, [:public, :named_table])
+
+Benchee.run(
+ %{
+ "ets:update_counter" => fn ->
+ :ets.update_counter(:limiter_bench, "bench", {2, 1}, {"bench", 0})
+ end
+ },
+ parallel: 1
+)
+
+Benchee.run(
+ %{
+ "ets:update_counter" => fn ->
+ :ets.update_counter(:limiter_bench, "bench", {2, 1}, {"bench", 0})
+ end
+ },
+ parallel: System.schedulers_online()
+)