From b541ef96741129f5aebd2b7526bbca8717cf5c40 Mon Sep 17 00:00:00 2001 From: Jordan Bracco Date: Fri, 8 May 2020 20:10:33 +0200 Subject: Switch to atomics, add shared ets, .. --- README.md | 3 + lib/limiter.ex | 155 ++++++++++++++++++++++++++++----- mix.exs | 5 +- mix.lock | 5 ++ test/limiter_test.exs | 36 ++------ test/samples/limiter.exs | 33 +++++-- test/samples/multi_limiter.exs | 56 ++++++++++++ test/samples/results_multi_limiter.txt | 86 ++++++++++++++++++ test/samples/update_counter.exs | 19 ++++ 9 files changed, 337 insertions(+), 61 deletions(-) create mode 100644 README.md create mode 100644 test/samples/multi_limiter.exs create mode 100644 test/samples/results_multi_limiter.txt create mode 100644 test/samples/update_counter.exs diff --git a/README.md b/README.md new file mode 100644 index 0000000..2e214c3 --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# Limiter + +See the docs in `lib/limiter.ex`. diff --git a/lib/limiter.ex b/lib/limiter.ex index 84c601c..f47e68f 100644 --- a/lib/limiter.ex +++ b/lib/limiter.ex @@ -1,47 +1,158 @@ defmodule Limiter do - @ets __MODULE__.ETS + require Logger - def new(name, max_running, max_waiting) do + @moduledoc """ + # Limiter + + A concurrency limiter. Limits the number of concurrent invocations possible, without using a worker pool or different processes. + + It supports two storage methods: + + * **`[atomics](https://erlang.org/doc/man/atomics.html)`** recommended and default if your OTP is > 21.2. + * **`[ets](https://erlang.org/doc/man/ets.html)`** either with a single table per Limiter (faster) or a shared table (better for a large number of limiters). + + You would however always want to use atomics, ets is mostly there for backwards compatibility. + """ + + @doc """ + Initializes a `Limiter`. + """ + + @spec new(name, max_running, max_waiting, options) :: :ok | {:error, :existing} + when name: atom(), + max_running: non_neg_integer(), + max_waiting: non_neg_integer() | :infinity, + options: [option], + option: {:wait, non_neg_integer()} | backend, + backend: :atomics | ets_backend, + ets_backend: :ets | {:ets, atom()} | {:ets, ets_name :: atom(), ets_options :: []} + def new(name, max_running, max_waiting, options \\ []) do + name = atom_name(name) + + if defined?(name) do + {:error, :existing} + else + wait = Keyword.get(options, :wait, 150) + backend = Keyword.get(options, :backend, default_backend()) + {:ok, backend} = setup_backend(backend) + :persistent_term.put(name, {__MODULE__, max_running, max_waiting, backend, wait}) + :ok + end + end + + @spec set(name, new_max_running, new_max_waiting, options) :: :ok | :error + when name: atom(), + new_max_running: non_neg_integer(), + new_max_waiting: non_neg_integer() | :infinity, + options: [option], + option: {:wait, non_neg_integer()} + @doc "Adjust the limiter limits at runtime" + def set(name, new_max_running, new_max_waiting, options \\ []) do name = atom_name(name) - :persistent_term.put(name, {max_running, max_waiting}) - :ets.new(name, [:public, :named_table]) - :ok + + if defined?(name) do + new_wait = Keyword.get(options, :wait) + {__MODULE__, max_running, max_waiting, backend, wait} = :persistent_term.get(name) + + new = + {__MODULE__, new_max_running || max_running, new_max_waiting || max_waiting, backend, + new_wait || wait} + + :persistent_term.put(name, new) + :ok + else + :error + end end + @spec limit(atom(), function()) :: {:error, :overload} | any() + @doc "Limits invocation of `fun`." def limit(name, fun) do - {max_running, max_waiting} = :persistent_term.get(atom_name(name)) + do_limit(atom_name(name), fun) + end + + defp do_limit(name, fun) do + {__MODULE__, max_running, max_waiting, backend, wait} = :persistent_term.get(name) max = max_running + max_waiting - counter = inc(name) + counter = inc(backend, name) cond do counter <= max_running -> - fun.() + try do + fun.() + after + dec(backend, name) + end counter > max -> + dec(backend, name) {:error, :overload} counter > max_running -> - wait(name, fun) + wait(backend, name, wait, fun) end - after - dec(name) end - defp wait(name, fun) do - Process.sleep(150) - dec(name) - limit(name, fun) + defp wait(backend, name, wait, fun) do + Process.sleep(wait) + dec(backend, name) + do_limit(name, fun) end - defp inc(name) do - name = atom_name(name) - :ets.update_counter(name, name, {2, 1}, {name, 0}) + defp inc({:ets, ets}, name) do + :ets.update_counter(ets, name, {2, 1}, {name, 0}) end - def dec(name) do - name = atom_name(name) - :ets.update_counter(name, name, {2, -1}, {name, 0}) + defp inc({:atomics, ref}, _) do + :atomics.add_get(ref, 1, 1) end - defp atom_name(suffix), do: Module.concat(@ets, suffix) + def dec({:ets, ets}, name) do + :ets.update_counter(ets, name, {2, -1}, {name, 0}) + end + + def dec({:atomics, ref}, _) do + :atomics.sub_get(ref, 1, 1) + end + + defp atom_name(suffix), do: Module.concat(__MODULE__, suffix) + + defp defined?(name) do + {__MODULE__, _, _, _, _, _} = :persistent_term.get(name) + true + rescue + _ -> false + end + + defp default_backend() do + if Code.ensure_loaded?(:atomics) do + :atomics + else + Logger.debug("Limiter: atomics not available, using ETS backend") + :ets + end + end + + defp setup_backend(:ets) do + setup_backend({:ets, ETS}) + end + + defp setup_backend({:ets, name}) do + setup_backend({:ets, name, [{:write_concurrency, true}, {:read_concurrency, true}]}) + end + + defp setup_backend({:ets, name, options}) do + ets_name = atom_name(name) + + case :ets.whereis(ets_name) do + :undefined -> :ets.new(ets_name, [:public, :named_table] ++ options) + _ -> nil + end + + {:ok, {:ets, ets_name}} + end + + defp setup_backend(:atomics) do + {:ok, {:atomics, :atomics.new(1, signed: true)}} + end end diff --git a/mix.exs b/mix.exs index f3ff456..0ff4802 100644 --- a/mix.exs +++ b/mix.exs @@ -11,19 +11,16 @@ defmodule Limiter.MixProject do ] end - # Run "mix help compile.app" to learn about applications. def application do [ extra_applications: [:logger] ] end - # Run "mix help deps" to learn about dependencies. defp deps do [ + {:ex_doc, "~> 0.21", only: :dev, runtime: false}, {:benchee, "~> 1.0", only: [:dev, :test]} - # {:dep_from_hexpm, "~> 0.3.0"}, - # {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"} ] end end diff --git a/mix.lock b/mix.lock index 5aa28d0..fa32375 100644 --- a/mix.lock +++ b/mix.lock @@ -1,4 +1,9 @@ %{ "benchee": {:hex, :benchee, "1.0.1", "66b211f9bfd84bd97e6d1beaddf8fc2312aaabe192f776e8931cb0c16f53a521", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}], "hexpm", "3ad58ae787e9c7c94dd7ceda3b587ec2c64604563e049b2a0e8baafae832addb"}, "deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm", "ce708e5f094b9cd4e8f2be4f00d2f4250c4095be93f8cd6d018c753894885430"}, + "earmark": {:hex, :earmark, "1.4.4", "4821b8d05cda507189d51f2caeef370cf1e18ca5d7dfb7d31e9cafe6688106a4", [:mix], [], "hexpm", "1f93aba7340574847c0f609da787f0d79efcab51b044bb6e242cae5aca9d264d"}, + "ex_doc": {:hex, :ex_doc, "0.21.3", "857ec876b35a587c5d9148a2512e952e24c24345552259464b98bfbb883c7b42", [:mix], [{:earmark, "~> 1.4", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "0db1ee8d1547ab4877c5b5dffc6604ef9454e189928d5ba8967d4a58a801f161"}, + "makeup": {:hex, :makeup, "1.0.1", "82f332e461dc6c79dbd82fbe2a9c10d48ed07146f0a478286e590c83c52010b5", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "49736fe5b66a08d8575bf5321d716bac5da20c8e6b97714fec2bcd6febcfa1f8"}, + "makeup_elixir": {:hex, :makeup_elixir, "0.14.0", "cf8b7c66ad1cff4c14679698d532f0b5d45a3968ffbcbfd590339cb57742f1ae", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "d4b316c7222a85bbaa2fd7c6e90e37e953257ad196dc229505137c5e505e9eff"}, + "nimble_parsec": {:hex, :nimble_parsec, "0.5.3", "def21c10a9ed70ce22754fdeea0810dafd53c2db3219a0cd54cf5526377af1c6", [:mix], [], "hexpm", "589b5af56f4afca65217a1f3eb3fee7e79b09c40c742fddc1c312b3ac0b3399f"}, } diff --git a/test/limiter_test.exs b/test/limiter_test.exs index c60db9e..1b81ead 100644 --- a/test/limiter_test.exs +++ b/test/limiter_test.exs @@ -2,38 +2,20 @@ defmodule LimiterTest do use ExUnit.Case doctest Limiter - defp test_ets(name, max, sleep, fun) do - count = :ets.update_counter(:limiter_test, name, {2, 1}, {name, 0}) - - if count <= max do - fun.({:ok, count}) - Process.sleep(sleep) - else - fun.(:fail) - end - after - :ets.update_counter(:limiter_test, name, {2, -1}, {name, 1}) + test "limiter ets is atomic" do + name = "test1" + Limiter.new(name, 2, 2) + atomic_test(name) end - test "limits with ets" do - :ets.new(:limiter_test, [:public, :named_table]) - ets = "test" - test = self() - spawn_link(fn -> test_ets(ets, 2, 500, fn result -> send(test, result) end) end) - spawn_link(fn -> test_ets(ets, 2, 750, fn result -> send(test, result) end) end) - spawn_link(fn -> test_ets(ets, 2, 500, fn result -> send(test, result) end) end) - assert_receive {:ok, 1} - assert_receive {:ok, 2} - assert_receive :fail - Process.sleep(500) - spawn_link(fn -> test_ets(ets, 2, 500, fn result -> send(test, result) end) end) - assert_receive {:ok, 2} + test "limiter atomics is atomic" do + name = "test2" + Limiter.new(name, 2, 2, backend: :atomics) + atomic_test(name) end - test "limiter" do - name = "test1" + defp atomic_test(name) do self = self() - Limiter.set(name, 2, 2) sleepy = fn sleep -> case Limiter.limit(name, fn -> diff --git a/test/samples/limiter.exs b/test/samples/limiter.exs index 896056d..785c85f 100644 --- a/test/samples/limiter.exs +++ b/test/samples/limiter.exs @@ -1,11 +1,28 @@ -:ets.new(:limiter_bench, [:public, :named_table]) -Limiter.new(:bench, 1_000_000_000_000_000_000_000_000_000_000_000_000_000_000_000, 0) +infinite = 1_000_000_000_000_000_000_000_000_000_000_000_000_000_000_000 +Limiter.new(:bench, infinite, 0) +Limiter.new(:bench_s, infinite, 0, ets: LimiterTest) -Benchee.run(%{ - "update_counter" => fn -> - :ets.update_counter(:limiter_bench, "bench", {2, 1}, {"bench", 0}) - end, - "limit" => fn -> +concurrent = [{:read_concurrency, true}, {:write_concurrency, true}] + +Limiter.new(:bench_rw, infinite, 0) +Limiter.new(:bench_s_rw, infinite, 0, ets: LimiterTest, ets_opts: concurrent) + +single = %{ + "Limiter.limit/2" => fn -> Limiter.limit(:bench, fn -> :ok end) + end, + "Limiter.limit/2 with concurrency" => fn -> + Limiter.limit(:bench_rw, fn -> :ok end) + end, + "Limiter:limit/2 with shared ets" => fn -> + Limiter.limit(:bench_s, fn -> :ok end) + end, + "Limiter:limit/2 with shared ets and concurrency" => fn -> + Limiter.limit(:bench_s_rw, fn -> :ok end) end -}) +} + +IO.puts("\n\n\n\nsingle, sequential\n\n\n\n") +Benchee.run(single, parallel: 1) +IO.puts("\n\n\n\nsingle, parallel\n\n\n\n") +Benchee.run(single, parallel: System.schedulers_online()) diff --git a/test/samples/multi_limiter.exs b/test/samples/multi_limiter.exs new file mode 100644 index 0000000..abec65d --- /dev/null +++ b/test/samples/multi_limiter.exs @@ -0,0 +1,56 @@ +infinite = 1_000_000_000_000_000_000_000_000_000_000_000_000_000_000_000 + +Limiter.new(:bench_u_0, infinite, 0, backend: {:ets, LimiterTest0, []}) +Limiter.new(:bench_u_1, infinite, 0, backend: {:ets, LimiterTest1, []}) +Limiter.new(:bench_u_2, infinite, 0, backend: {:ets, LimiterTest2, []}) +Limiter.new(:bench_u_3, infinite, 0, backend: {:ets, LimiterTest3, []}) + +Limiter.new(:bench_a_0, infinite, 0, backend: :atomics) +Limiter.new(:bench_a_1, infinite, 0, backend: :atomics) +Limiter.new(:bench_a_2, infinite, 0, backend: :atomics) +Limiter.new(:bench_a_3, infinite, 0, backend: :atomics) + +Limiter.new(:bench_s_0, infinite, 0, backend: {:ets, LimiterTest, []}) +Limiter.new(:bench_s_1, infinite, 0, backend: {:ets, LimiterTest, []}) +Limiter.new(:bench_s_2, infinite, 0, backend: {:ets, LimiterTest, []}) +Limiter.new(:bench_s_3, infinite, 0, backend: {:ets, LimiterTest, []}) + +rw = [{:read_concurrency, true}, {:write_concurrency, true}] + +Limiter.new(:bench_u_rw0, infinite, 0, backend: {:ets, LimiterTestRW0, rw}) +Limiter.new(:bench_u_rw1, infinite, 0, backend: {:ets, LimiterTestRW1, rw}) +Limiter.new(:bench_u_rw2, infinite, 0, backend: {:ets, LimiterTestRW2, rw}) +Limiter.new(:bench_u_rw3, infinite, 0, backend: {:ets, LimiterTestRW3, rw}) + +Limiter.new(:bench_s_rw0, infinite, 0, backend: {:ets, LimiterTestRW, rw}) +Limiter.new(:bench_s_rw1, infinite, 0, backend: {:ets, LimiterTestRW, rw}) +Limiter.new(:bench_s_rw2, infinite, 0, backend: {:ets, LimiterTestRW, rw}) +Limiter.new(:bench_s_rw3, infinite, 0, backend: {:ets, LimiterTestRW, rw}) + +multiple = %{ + "Limiter.limit/2 unique ets" => fn -> + limiter = Enum.random([:bench_u_0, :bench_u_1, :bench_u_2, :bench_u_3]) + Limiter.limit(limiter, fn -> :ok end) + end, + "Limiter:limit/2 shared ets" => fn -> + limiter = Enum.random([:bench_s_0, :bench_s_1, :bench_s_2, :bench_s_3]) + Limiter.limit(limiter, fn -> :ok end) + end, + "Limiter.limit/2 unique ets, concurrency" => fn -> + limiter = Enum.random([:bench_u_rw0, :bench_u_rw1, :bench_u_rw2, :bench_u_rw3]) + Limiter.limit(limiter, fn -> :ok end) + end, + "Limiter:limit/2 shared ets, concurrency" => fn -> + limiter = Enum.random([:bench_s_rw0, :bench_s_rw1, :bench_s_rw2, :bench_s_rw3]) + Limiter.limit(limiter, fn -> :ok end) + end, + "Limiter:limit/2 atomics" => fn -> + limiter = Enum.random([:bench_a_0, :bench_a_1, :bench_a_2, :bench_a_3]) + Limiter.limit(limiter, fn -> :ok end) + end +} + +IO.puts("\n\n\n\nmulti, sequential\n\n\n\n") +Benchee.run(multiple) +IO.puts("\n\n\n\nmulti, parallel\n\n\n\n") +Benchee.run(multiple, parallel: System.schedulers_online()) diff --git a/test/samples/results_multi_limiter.txt b/test/samples/results_multi_limiter.txt new file mode 100644 index 0000000..112c325 --- /dev/null +++ b/test/samples/results_multi_limiter.txt @@ -0,0 +1,86 @@ + + + + +multi, sequential + + + + +Operating System: Linux +CPU Information: AMD EPYC 7401P 24-Core Processor +Number of Available Cores: 8 +Available memory: 31.41 GB +Elixir 1.10.3 +Erlang 22.3.2 + +Benchmark suite executing with the following configuration: +warmup: 2 s +time: 5 s +memory time: 0 ns +parallel: 1 +inputs: none specified +Estimated total run time: 35 s + +Benchmarking Limiter.limit/2 unique ets... +Benchmarking Limiter.limit/2 unique ets, concurrency... +Benchmarking Limiter:limit/2 atomics... +Benchmarking Limiter:limit/2 shared ets... +Benchmarking Limiter:limit/2 shared ets, concurrency... + +Name ips average deviation median 99th % +Limiter:limit/2 atomics 491.88 K 2.03 μs ±1506.30% 1.55 μs 3.48 μs +Limiter.limit/2 unique ets 414.63 K 2.41 μs ±1169.34% 1.97 μs 4.53 μs +Limiter:limit/2 shared ets 411.43 K 2.43 μs ±1286.95% 1.96 μs 3.66 μs +Limiter.limit/2 unique ets, concurrency 406.50 K 2.46 μs ±1006.31% 2.06 μs 4.34 μs +Limiter:limit/2 shared ets, concurrency 384.04 K 2.60 μs ±1293.25% 2.12 μs 4.37 μs + +Comparison: +Limiter:limit/2 atomics 491.88 K +Limiter.limit/2 unique ets 414.63 K - 1.19x slower +0.38 μs +Limiter:limit/2 shared ets 411.43 K - 1.20x slower +0.40 μs +Limiter.limit/2 unique ets, concurrency 406.50 K - 1.21x slower +0.43 μs +Limiter:limit/2 shared ets, concurrency 384.04 K - 1.28x slower +0.57 μs + + + + +multi, parallel + + + + +Operating System: Linux +CPU Information: AMD EPYC 7401P 24-Core Processor +Number of Available Cores: 8 +Available memory: 31.41 GB +Elixir 1.10.3 +Erlang 22.3.2 + +Benchmark suite executing with the following configuration: +warmup: 2 s +time: 5 s +memory time: 0 ns +parallel: 8 +inputs: none specified +Estimated total run time: 35 s + +Benchmarking Limiter.limit/2 unique ets... +Benchmarking Limiter.limit/2 unique ets, concurrency... +Benchmarking Limiter:limit/2 atomics... +Benchmarking Limiter:limit/2 shared ets... +Benchmarking Limiter:limit/2 shared ets, concurrency... + +Name ips average deviation median 99th % +Limiter:limit/2 atomics 307.84 K 3.25 μs ±1113.62% 2.09 μs 10.24 μs +Limiter.limit/2 unique ets, concurrency 95.56 K 10.46 μs ±391.37% 2.93 μs 163.02 μs +Limiter:limit/2 shared ets, concurrency 92.39 K 10.82 μs ±374.36% 2.92 μs 158.97 μs +Limiter.limit/2 unique ets 80.68 K 12.39 μs ±362.74% 2.85 μs 160.66 μs +Limiter:limit/2 shared ets 6.04 K 165.66 μs ±17.23% 167.48 μs 237.96 μs + +Comparison: +Limiter:limit/2 atomics 307.84 K +Limiter.limit/2 unique ets, concurrency 95.56 K - 3.22x slower +7.22 μs +Limiter:limit/2 shared ets, concurrency 92.39 K - 3.33x slower +7.57 μs +Limiter.limit/2 unique ets 80.68 K - 3.82x slower +9.15 μs +Limiter:limit/2 shared ets 6.04 K - 51.00x slower +162.41 μs diff --git a/test/samples/update_counter.exs b/test/samples/update_counter.exs new file mode 100644 index 0000000..1768735 --- /dev/null +++ b/test/samples/update_counter.exs @@ -0,0 +1,19 @@ +:ets.new(:limiter_bench, [:public, :named_table]) + +Benchee.run( + %{ + "ets:update_counter" => fn -> + :ets.update_counter(:limiter_bench, "bench", {2, 1}, {"bench", 0}) + end + }, + parallel: 1 +) + +Benchee.run( + %{ + "ets:update_counter" => fn -> + :ets.update_counter(:limiter_bench, "bench", {2, 1}, {"bench", 0}) + end + }, + parallel: System.schedulers_online() +) -- cgit v1.2.3