diff options
author | rinpatch <rinpatch@sdf.org> | 2020-09-04 23:54:14 +0300 |
---|---|---|
committer | rinpatch <rinpatch@sdf.org> | 2020-09-05 00:02:54 +0300 |
commit | 8c1caa0f10cb5e756f8f2b53d53110a008948275 (patch) | |
tree | 8971cc31af16274fd3f3a1d4278cb7293f3d595c | |
parent | Fix decrement after max_retries been reached + telemetry events (diff) |
Fix counters not decrementing when the process is exited
This fixes the issue by trapping exits for the duration of the fun,
then turning exit trapping off and killing the process if there is an
exit message in the mailbox.
The real-world case where this fixes things is Pleroma MediaProxy.
Without exit trapping, the process would get killed if the connection
was closed client-side and a counter would be left incremented.
I am not sure if trapping exits is the optimal solution, but I don't
see any other option.
-rw-r--r-- | lib/concurrent_limiter.ex | 33 |
1 files changed, 29 insertions, 4 deletions
diff --git a/lib/concurrent_limiter.ex b/lib/concurrent_limiter.ex index e7b6b55..4a367ea 100644 --- a/lib/concurrent_limiter.ex +++ b/lib/concurrent_limiter.ex @@ -101,25 +101,50 @@ defmodule ConcurrentLimiter do cond do counter <= max_running -> - :telemetry.execute([:concurrent_limiter, :execution], %{counter: counter}, %{limiter: name}) + :telemetry.execute([:concurrent_limiter, :execution], %{counter: counter}, %{ + limiter: name + }) + + Process.flag(:trap_exit, true) + try do fun.() after dec(ref, name) + Process.flag(:trap_exit, false) + + receive do + {:EXIT, _, reason} -> + Process.exit(self(), reason) + after + 0 -> :noop + end end counter > max -> - :telemetry.execute([:concurrent_limiter, :overload], %{counter: counter}, %{limiter: name, scope: "max"}) + :telemetry.execute([:concurrent_limiter, :overload], %{counter: counter}, %{ + limiter: name, + scope: "max" + }) + dec(ref, name) {:error, :overload} retries + 1 > max_retries -> - :telemetry.execute([:concurrent_limiter, :max_retries], %{counter: counter}, %{limiter: name, retries: retries + 1}) + :telemetry.execute([:concurrent_limiter, :max_retries], %{counter: counter}, %{ + limiter: name, + retries: retries + 1 + }) + dec(ref, name) {:error, :overload} counter > max_running -> - :telemetry.execute([:concurrent_limiter, :wait], %{counter: counter}, %{limiter: name, retries: retries + 1}) + :telemetry.execute([:concurrent_limiter, :wait], %{counter: counter}, %{ + limiter: name, + retries: retries + 1 + }) + wait(ref, name, fun, wait, opts, retries + 1) end end |