summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJordan Bracco <href@random.sh>2020-09-08 11:39:55 +0200
committerJordan Bracco <href@random.sh>2020-09-08 11:44:47 +0200
commit12490aa78a9f6e293df00a4b4a59eb00b5db4219 (patch)
treea03a3bed0eefdb9393aedb855e568a8e820ced12
parentFix behaviour of max_waiting = 0 with max_running = 1 (diff)
Sentinel processes - ensure counter is always decremented
-rw-r--r--CHANGELOG.md1
-rw-r--r--lib/concurrent_limiter.ex66
-rw-r--r--test/concurrent_limiter_test.exs23
-rw-r--r--test/samples/limiter.exs19
4 files changed, 72 insertions, 37 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index daa5288..292db69 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -13,6 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
### Fixed
- Decrement counter when max retries has been reached.
+- Ensure counter is always decremented in case of process being killed (by using "sentinel" processes that monitors).
- Fixes behaviour of `max_waiting = 0` with `max_size = 1`.
## [0.1.0] - 2020-05-16
diff --git a/lib/concurrent_limiter.ex b/lib/concurrent_limiter.ex
index f871276..d1be6e6 100644
--- a/lib/concurrent_limiter.ex
+++ b/lib/concurrent_limiter.ex
@@ -97,6 +97,7 @@ defmodule ConcurrentLimiter do
max = max_running + max_waiting
counter = inc(ref, name)
max_retries = Keyword.get(opts, :max_retries) || max_retries
+ sentinel = Keyword.get(opts, :sentinel) || true
:telemetry.execute([:concurrent_limiter, :limit], %{counter: counter}, %{limiter: name})
cond do
@@ -105,20 +106,13 @@ defmodule ConcurrentLimiter do
limiter: name
})
- Process.flag(:trap_exit, true)
+ mon = sentinel_start(sentinel, ref, name)
try do
fun.()
after
dec(ref, name)
- Process.flag(:trap_exit, false)
-
- receive do
- {:EXIT, _, reason} ->
- Process.exit(self(), reason)
- after
- 0 -> :noop
- end
+ sentinel_stop(mon)
end
counter > max ->
@@ -127,13 +121,24 @@ defmodule ConcurrentLimiter do
scope: "max"
})
+ dec(ref, name)
+ {:error, :overload}
+
max_waiting == 0 ->
- :telemetry.execute([:concurrent_limiter, :overload], %{counter: counter}, %{limiter: name, scope: "max"})
+ :telemetry.execute([:concurrent_limiter, :overload], %{counter: counter}, %{
+ limiter: name,
+ scope: "max"
+ })
+
dec(ref, name)
{:error, :overload}
- counter > max ->
- :telemetry.execute([:concurrent_limiter, :overload], %{counter: counter}, %{limiter: name, scope: "max"})
+ counter > max ->
+ :telemetry.execute([:concurrent_limiter, :overload], %{counter: counter}, %{
+ limiter: name,
+ scope: "max"
+ })
+
dec(ref, name)
{:error, :overload}
@@ -152,17 +157,15 @@ defmodule ConcurrentLimiter do
retries: retries + 1
})
- wait(ref, name, fun, wait, opts, retries + 1)
+ mon = sentinel_start(sentinel, ref, name)
+ wait = Keyword.get(opts, :timeout) || wait
+ Process.sleep(wait)
+ dec(ref, name)
+ sentinel_stop(mon)
+ do_limit(name, fun, opts, retries + 1)
end
end
- defp wait(ref, name, fun, wait, opts, retries) do
- wait = Keyword.get(opts, :timeout) || wait
- Process.sleep(wait)
- dec(ref, name)
- do_limit(name, fun, opts, retries)
- end
-
defp inc(ref, _) do
:atomics.add_get(ref, 1, 1)
end
@@ -179,4 +182,27 @@ defmodule ConcurrentLimiter do
rescue
_ -> false
end
+
+ defp sentinel_start(true, ref, name) do
+ self = self()
+
+ spawn(fn ->
+ sentinel_run(ref, name, self, Process.monitor(self))
+ end)
+ end
+
+ defp sentinel_start(_, _, _), do: nil
+
+ defp sentinel_stop(pid) when is_pid(pid) do
+ Process.exit(pid, :normal)
+ end
+
+ defp sentinel_stop(_), do: nil
+
+ defp sentinel_run(ref, name, pid, mon) do
+ receive do
+ {:DOWN, ^mon, _, ^pid, reason} ->
+ dec(ref, name)
+ end
+ end
end
diff --git a/test/concurrent_limiter_test.exs b/test/concurrent_limiter_test.exs
index 74c9590..c06a274 100644
--- a/test/concurrent_limiter_test.exs
+++ b/test/concurrent_limiter_test.exs
@@ -9,14 +9,33 @@ defmodule ConcurrentLimiterTest do
test "limited to one" do
name = "l1"
ConcurrentLimiter.new(name, 1, 0, max_retries: 0)
- endless = fn() -> :timer.sleep(10000) end
- spawn(fn() -> ConcurrentLimiter.limit(name, endless) end)
+ endless = fn -> :timer.sleep(10_000) end
+ spawn(fn -> ConcurrentLimiter.limit(name, endless) end)
:timer.sleep(5)
{:error, :overload} = ConcurrentLimiter.limit(name, endless)
{:error, :overload} = ConcurrentLimiter.limit(name, endless)
{:error, :overload} = ConcurrentLimiter.limit(name, endless)
end
+ test "decrements correctly when current pid exits" do
+ name = "l1crash"
+ ConcurrentLimiter.new(name, 1, 0, max_retries: 0)
+ endless = fn -> :timer.sleep(100) end
+
+ pid =
+ spawn(fn ->
+ ConcurrentLimiter.limit(name, endless)
+ end)
+
+ # let some time for spawn to execute
+ :timer.sleep(5)
+ {:error, :overload} = ConcurrentLimiter.limit(name, endless)
+ Process.exit(pid, :kill)
+ # let some time for exit to execute
+ :timer.sleep(5)
+ :ok = ConcurrentLimiter.limit(name, fn -> :ok end)
+ end
+
test "limiter is atomic" do
name = "test"
ConcurrentLimiter.new(name, 2, 2)
diff --git a/test/samples/limiter.exs b/test/samples/limiter.exs
index f903658..690d09a 100644
--- a/test/samples/limiter.exs
+++ b/test/samples/limiter.exs
@@ -1,24 +1,13 @@
infinite = 1_000_000_000_000_000_000_000_000_000_000_000_000_000_000_000
ConcurrentLimiter.new(:bench, infinite, 0)
-ConcurrentLimiter.new(:bench_s, infinite, 0, ets: ConcurrentLimiterTest)
-
-concurrent = [{:read_concurrency, true}, {:write_concurrency, true}]
-
-ConcurrentLimiter.new(:bench_rw, infinite, 0)
-ConcurrentLimiter.new(:bench_s_rw, infinite, 0, ets: ConcurrentLimiterTest, ets_opts: concurrent)
+ConcurrentLimiter.new(:bench_no_sentinel, infinite, 0, sentinel: false)
single = %{
- "ConcurrentLimiter.limit/2" => fn ->
+ "ConcurrentLimiter.limit/2 (with sentinels)" => fn ->
ConcurrentLimiter.limit(:bench, fn -> :ok end)
end,
- "ConcurrentLimiter.limit/2 with concurrency" => fn ->
- ConcurrentLimiter.limit(:bench_rw, fn -> :ok end)
- end,
- "ConcurrentLimiter:limit/2 with shared ets" => fn ->
- ConcurrentLimiter.limit(:bench_s, fn -> :ok end)
- end,
- "ConcurrentLimiter:limit/2 with shared ets and concurrency" => fn ->
- ConcurrentLimiter.limit(:bench_s_rw, fn -> :ok end)
+ "ConcurrentLimiter.limit/2 (without sentinels)" => fn ->
+ ConcurrentLimiter.limit(:bench_no_sentinel, fn -> :ok end)
end
}