summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorJordan Bracco <href@random.sh>2020-05-16 12:17:40 +0200
committerJordan Bracco <href@random.sh>2020-05-16 12:22:30 +0200
commit447b4c9a2e99a37c0d17e01e79d786f2c11590f5 (patch)
tree87202b1b8e9b09a46ad862a475af0664e5dd753a /lib
parentRemove ETS backend, add max_retries, add options to limit/3. (diff)
Format and prepare for release
Diffstat (limited to 'lib')
-rw-r--r--lib/concurrent_limiter.ex69
1 files changed, 43 insertions, 26 deletions
diff --git a/lib/concurrent_limiter.ex b/lib/concurrent_limiter.ex
index b4572c6..9555f00 100644
--- a/lib/concurrent_limiter.ex
+++ b/lib/concurrent_limiter.ex
@@ -1,54 +1,71 @@
+# ConcurrentLimiter: A concurrency limiter.
+# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: LGPL-3.0-only
+
defmodule ConcurrentLimiter do
require Logger
@moduledoc """
- # Concurrent Limiter
-
A concurrency limiter. Limits the number of concurrent invocations possible, without using a worker pool or different processes.
It can be useful in cases where you don't need a worker pool but still being able to limit concurrent calls without much overhead.
- As it internally uses `persistent_term` to store metadata, and can fallback to ETS tables, it is however not made for a large number
- of limiters and cannot be used for things like a per-user rate limiter.
- """
+ As it internally uses `persistent_term` to store metadata, it is not made for a large number of different or dynamic limiters and
+ cannot be used for things like a per-user rate limiter.
- @doc """
- Initializes a `ConcurrentLimiter`.
+ ```elixir
+ :ok = ConcurrentLimiter.new(RequestLimiter, 10, 10)
+ ConcurrentLimiter.limit(RequestLimiter, fn() -> something_that_can_only_run_ten_times_concurrently() end)
+ ```
"""
@default_wait 150
@default_max_retries 5
- @spec new(name, max_running, max_waiting, options) :: :ok | {:error, :existing} when name: atom(),
- max_running: non_neg_integer(),
- max_waiting: non_neg_integer() | :infinity,
- options: [option],
- option: {:wait, non_neg_integer()} | {:max_retries, non_neg_integer()}
+ @doc "Initializes a `ConcurrentLimiter`."
+ @spec new(name, max_running, max_waiting, options) :: :ok | {:error, :existing}
+ when name: atom(),
+ max_running: non_neg_integer(),
+ max_waiting: non_neg_integer() | :infinity,
+ options: [option],
+ option: {:wait, non_neg_integer()} | {:max_retries, non_neg_integer()}
def new(name, max_running, max_waiting, options \\ []) do
name = prefix_name(name)
+
if defined?(name) do
{:error, :existing}
else
wait = Keyword.get(options, :wait, @default_wait)
max_retries = Keyword.get(options, :max_retries, @default_max_retries)
- atomics = :atomics.new(1, [signed: true])
- :persistent_term.put(name, {__MODULE__, max_running, max_waiting, atomics, wait, max_retries})
+ atomics = :atomics.new(1, signed: true)
+
+ :persistent_term.put(
+ name,
+ {__MODULE__, max_running, max_waiting, atomics, wait, max_retries}
+ )
+
:ok
end
end
- @spec set(name, new_max_running, new_max_waiting, options) :: :ok | :error when name: atom(),
- new_max_running: non_neg_integer(),
- new_max_waiting: non_neg_integer() | :infinity,
- options: [option],
- option: {:wait, non_neg_integer()}
- @doc "Adjust the limiter limits at runtime"
+ @doc "Adjust the limits at runtime."
+ @spec set(name, new_max_running, new_max_waiting, options) :: :ok | :error
+ when name: atom(),
+ new_max_running: non_neg_integer(),
+ new_max_waiting: non_neg_integer() | :infinity,
+ options: [option],
+ option: {:wait, non_neg_integer()}
def set(name, new_max_running, new_max_waiting, options \\ []) do
name = prefix_name(name)
+
if defined?(name) do
new_wait = Keyword.get(options, :wait)
new_max_retries = Keyword.get(options, :max_retries)
{__MODULE__, max_running, max_waiting, ref, wait, max_retries} = :persistent_term.get(name)
- new = {__MODULE__, new_max_running || max_running, new_max_waiting || max_waiting, ref, new_wait || wait, new_max_retries || max_retries}
+
+ new =
+ {__MODULE__, new_max_running || max_running, new_max_waiting || max_waiting, ref,
+ new_wait || wait, new_max_retries || max_retries}
+
:persistent_term.put(name, new)
:ok
else
@@ -56,9 +73,10 @@ defmodule ConcurrentLimiter do
end
end
- @spec limit(atom(), function(), opts) :: {:error, :overload} | any() when opts: [option],
- option: {:wait, non_neg_integer()} | {:max_retries, non_neg_integer()}
@doc "Limits invocation of `fun`."
+ @spec limit(atom(), function(), opts) :: {:error, :overload} | any()
+ when opts: [option],
+ option: {:wait, non_neg_integer()} | {:max_retries, non_neg_integer()}
def limit(name, fun, opts \\ []) do
do_limit(prefix_name(name), fun, opts, 0)
end
@@ -85,11 +103,11 @@ defmodule ConcurrentLimiter do
{:error, :overload}
counter > max_running ->
- wait(ref, name, wait, fun, opts, max_retries, retries + 1)
+ wait(ref, name, fun, wait, opts, retries + 1)
end
end
- defp wait(ref, name, wait, fun, opts, max_retries, retries) do
+ defp wait(ref, name, fun, wait, opts, retries) do
wait = Keyword.get(opts, :timeout) || wait
Process.sleep(wait)
dec(ref, name)
@@ -112,5 +130,4 @@ defmodule ConcurrentLimiter do
rescue
_ -> false
end
-
end