diff options
author | Jordan Bracco <href@random.sh> | 2020-09-08 11:39:55 +0200 |
---|---|---|
committer | Jordan Bracco <href@random.sh> | 2020-09-08 11:44:47 +0200 |
commit | 12490aa78a9f6e293df00a4b4a59eb00b5db4219 (patch) | |
tree | a03a3bed0eefdb9393aedb855e568a8e820ced12 | |
parent | Fix behaviour of max_waiting = 0 with max_running = 1 (diff) |
Sentinel processes - ensure counter is always decremented
-rw-r--r-- | CHANGELOG.md | 1 | ||||
-rw-r--r-- | lib/concurrent_limiter.ex | 66 | ||||
-rw-r--r-- | test/concurrent_limiter_test.exs | 23 | ||||
-rw-r--r-- | test/samples/limiter.exs | 19 |
4 files changed, 72 insertions, 37 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index daa5288..292db69 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ### Fixed - Decrement counter when max retries has been reached. +- Ensure counter is always decremented in case of process being killed (by using "sentinel" processes that monitors). - Fixes behaviour of `max_waiting = 0` with `max_size = 1`. ## [0.1.0] - 2020-05-16 diff --git a/lib/concurrent_limiter.ex b/lib/concurrent_limiter.ex index f871276..d1be6e6 100644 --- a/lib/concurrent_limiter.ex +++ b/lib/concurrent_limiter.ex @@ -97,6 +97,7 @@ defmodule ConcurrentLimiter do max = max_running + max_waiting counter = inc(ref, name) max_retries = Keyword.get(opts, :max_retries) || max_retries + sentinel = Keyword.get(opts, :sentinel) || true :telemetry.execute([:concurrent_limiter, :limit], %{counter: counter}, %{limiter: name}) cond do @@ -105,20 +106,13 @@ defmodule ConcurrentLimiter do limiter: name }) - Process.flag(:trap_exit, true) + mon = sentinel_start(sentinel, ref, name) try do fun.() after dec(ref, name) - Process.flag(:trap_exit, false) - - receive do - {:EXIT, _, reason} -> - Process.exit(self(), reason) - after - 0 -> :noop - end + sentinel_stop(mon) end counter > max -> @@ -127,13 +121,24 @@ defmodule ConcurrentLimiter do scope: "max" }) + dec(ref, name) + {:error, :overload} + max_waiting == 0 -> - :telemetry.execute([:concurrent_limiter, :overload], %{counter: counter}, %{limiter: name, scope: "max"}) + :telemetry.execute([:concurrent_limiter, :overload], %{counter: counter}, %{ + limiter: name, + scope: "max" + }) + dec(ref, name) {:error, :overload} - counter > max -> - :telemetry.execute([:concurrent_limiter, :overload], %{counter: counter}, %{limiter: name, scope: "max"}) + counter > max -> + :telemetry.execute([:concurrent_limiter, :overload], %{counter: counter}, %{ + limiter: name, + scope: "max" + }) + dec(ref, name) {:error, :overload} @@ -152,17 +157,15 @@ defmodule ConcurrentLimiter do retries: retries + 1 }) - wait(ref, name, fun, wait, opts, retries + 1) + mon = sentinel_start(sentinel, ref, name) + wait = Keyword.get(opts, :timeout) || wait + Process.sleep(wait) + dec(ref, name) + sentinel_stop(mon) + do_limit(name, fun, opts, retries + 1) end end - defp wait(ref, name, fun, wait, opts, retries) do - wait = Keyword.get(opts, :timeout) || wait - Process.sleep(wait) - dec(ref, name) - do_limit(name, fun, opts, retries) - end - defp inc(ref, _) do :atomics.add_get(ref, 1, 1) end @@ -179,4 +182,27 @@ defmodule ConcurrentLimiter do rescue _ -> false end + + defp sentinel_start(true, ref, name) do + self = self() + + spawn(fn -> + sentinel_run(ref, name, self, Process.monitor(self)) + end) + end + + defp sentinel_start(_, _, _), do: nil + + defp sentinel_stop(pid) when is_pid(pid) do + Process.exit(pid, :normal) + end + + defp sentinel_stop(_), do: nil + + defp sentinel_run(ref, name, pid, mon) do + receive do + {:DOWN, ^mon, _, ^pid, reason} -> + dec(ref, name) + end + end end diff --git a/test/concurrent_limiter_test.exs b/test/concurrent_limiter_test.exs index 74c9590..c06a274 100644 --- a/test/concurrent_limiter_test.exs +++ b/test/concurrent_limiter_test.exs @@ -9,14 +9,33 @@ defmodule ConcurrentLimiterTest do test "limited to one" do name = "l1" ConcurrentLimiter.new(name, 1, 0, max_retries: 0) - endless = fn() -> :timer.sleep(10000) end - spawn(fn() -> ConcurrentLimiter.limit(name, endless) end) + endless = fn -> :timer.sleep(10_000) end + spawn(fn -> ConcurrentLimiter.limit(name, endless) end) :timer.sleep(5) {:error, :overload} = ConcurrentLimiter.limit(name, endless) {:error, :overload} = ConcurrentLimiter.limit(name, endless) {:error, :overload} = ConcurrentLimiter.limit(name, endless) end + test "decrements correctly when current pid exits" do + name = "l1crash" + ConcurrentLimiter.new(name, 1, 0, max_retries: 0) + endless = fn -> :timer.sleep(100) end + + pid = + spawn(fn -> + ConcurrentLimiter.limit(name, endless) + end) + + # let some time for spawn to execute + :timer.sleep(5) + {:error, :overload} = ConcurrentLimiter.limit(name, endless) + Process.exit(pid, :kill) + # let some time for exit to execute + :timer.sleep(5) + :ok = ConcurrentLimiter.limit(name, fn -> :ok end) + end + test "limiter is atomic" do name = "test" ConcurrentLimiter.new(name, 2, 2) diff --git a/test/samples/limiter.exs b/test/samples/limiter.exs index f903658..690d09a 100644 --- a/test/samples/limiter.exs +++ b/test/samples/limiter.exs @@ -1,24 +1,13 @@ infinite = 1_000_000_000_000_000_000_000_000_000_000_000_000_000_000_000 ConcurrentLimiter.new(:bench, infinite, 0) -ConcurrentLimiter.new(:bench_s, infinite, 0, ets: ConcurrentLimiterTest) - -concurrent = [{:read_concurrency, true}, {:write_concurrency, true}] - -ConcurrentLimiter.new(:bench_rw, infinite, 0) -ConcurrentLimiter.new(:bench_s_rw, infinite, 0, ets: ConcurrentLimiterTest, ets_opts: concurrent) +ConcurrentLimiter.new(:bench_no_sentinel, infinite, 0, sentinel: false) single = %{ - "ConcurrentLimiter.limit/2" => fn -> + "ConcurrentLimiter.limit/2 (with sentinels)" => fn -> ConcurrentLimiter.limit(:bench, fn -> :ok end) end, - "ConcurrentLimiter.limit/2 with concurrency" => fn -> - ConcurrentLimiter.limit(:bench_rw, fn -> :ok end) - end, - "ConcurrentLimiter:limit/2 with shared ets" => fn -> - ConcurrentLimiter.limit(:bench_s, fn -> :ok end) - end, - "ConcurrentLimiter:limit/2 with shared ets and concurrency" => fn -> - ConcurrentLimiter.limit(:bench_s_rw, fn -> :ok end) + "ConcurrentLimiter.limit/2 (without sentinels)" => fn -> + ConcurrentLimiter.limit(:bench_no_sentinel, fn -> :ok end) end } |