diff options
author | Jordan Bracco <href@random.sh> | 2020-05-16 12:17:40 +0200 |
---|---|---|
committer | Jordan Bracco <href@random.sh> | 2020-05-16 12:22:30 +0200 |
commit | 447b4c9a2e99a37c0d17e01e79d786f2c11590f5 (patch) | |
tree | 87202b1b8e9b09a46ad862a475af0664e5dd753a /lib | |
parent | Remove ETS backend, add max_retries, add options to limit/3. (diff) |
Format and prepare for release
Diffstat (limited to 'lib')
-rw-r--r-- | lib/concurrent_limiter.ex | 69 |
1 files changed, 43 insertions, 26 deletions
diff --git a/lib/concurrent_limiter.ex b/lib/concurrent_limiter.ex index b4572c6..9555f00 100644 --- a/lib/concurrent_limiter.ex +++ b/lib/concurrent_limiter.ex @@ -1,54 +1,71 @@ +# ConcurrentLimiter: A concurrency limiter. +# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: LGPL-3.0-only + defmodule ConcurrentLimiter do require Logger @moduledoc """ - # Concurrent Limiter - A concurrency limiter. Limits the number of concurrent invocations possible, without using a worker pool or different processes. It can be useful in cases where you don't need a worker pool but still being able to limit concurrent calls without much overhead. - As it internally uses `persistent_term` to store metadata, and can fallback to ETS tables, it is however not made for a large number - of limiters and cannot be used for things like a per-user rate limiter. - """ + As it internally uses `persistent_term` to store metadata, it is not made for a large number of different or dynamic limiters and + cannot be used for things like a per-user rate limiter. - @doc """ - Initializes a `ConcurrentLimiter`. + ```elixir + :ok = ConcurrentLimiter.new(RequestLimiter, 10, 10) + ConcurrentLimiter.limit(RequestLimiter, fn() -> something_that_can_only_run_ten_times_concurrently() end) + ``` """ @default_wait 150 @default_max_retries 5 - @spec new(name, max_running, max_waiting, options) :: :ok | {:error, :existing} when name: atom(), - max_running: non_neg_integer(), - max_waiting: non_neg_integer() | :infinity, - options: [option], - option: {:wait, non_neg_integer()} | {:max_retries, non_neg_integer()} + @doc "Initializes a `ConcurrentLimiter`." + @spec new(name, max_running, max_waiting, options) :: :ok | {:error, :existing} + when name: atom(), + max_running: non_neg_integer(), + max_waiting: non_neg_integer() | :infinity, + options: [option], + option: {:wait, non_neg_integer()} | {:max_retries, non_neg_integer()} def new(name, max_running, max_waiting, options \\ []) do name = prefix_name(name) + if defined?(name) do {:error, :existing} else wait = Keyword.get(options, :wait, @default_wait) max_retries = Keyword.get(options, :max_retries, @default_max_retries) - atomics = :atomics.new(1, [signed: true]) - :persistent_term.put(name, {__MODULE__, max_running, max_waiting, atomics, wait, max_retries}) + atomics = :atomics.new(1, signed: true) + + :persistent_term.put( + name, + {__MODULE__, max_running, max_waiting, atomics, wait, max_retries} + ) + :ok end end - @spec set(name, new_max_running, new_max_waiting, options) :: :ok | :error when name: atom(), - new_max_running: non_neg_integer(), - new_max_waiting: non_neg_integer() | :infinity, - options: [option], - option: {:wait, non_neg_integer()} - @doc "Adjust the limiter limits at runtime" + @doc "Adjust the limits at runtime." + @spec set(name, new_max_running, new_max_waiting, options) :: :ok | :error + when name: atom(), + new_max_running: non_neg_integer(), + new_max_waiting: non_neg_integer() | :infinity, + options: [option], + option: {:wait, non_neg_integer()} def set(name, new_max_running, new_max_waiting, options \\ []) do name = prefix_name(name) + if defined?(name) do new_wait = Keyword.get(options, :wait) new_max_retries = Keyword.get(options, :max_retries) {__MODULE__, max_running, max_waiting, ref, wait, max_retries} = :persistent_term.get(name) - new = {__MODULE__, new_max_running || max_running, new_max_waiting || max_waiting, ref, new_wait || wait, new_max_retries || max_retries} + + new = + {__MODULE__, new_max_running || max_running, new_max_waiting || max_waiting, ref, + new_wait || wait, new_max_retries || max_retries} + :persistent_term.put(name, new) :ok else @@ -56,9 +73,10 @@ defmodule ConcurrentLimiter do end end - @spec limit(atom(), function(), opts) :: {:error, :overload} | any() when opts: [option], - option: {:wait, non_neg_integer()} | {:max_retries, non_neg_integer()} @doc "Limits invocation of `fun`." + @spec limit(atom(), function(), opts) :: {:error, :overload} | any() + when opts: [option], + option: {:wait, non_neg_integer()} | {:max_retries, non_neg_integer()} def limit(name, fun, opts \\ []) do do_limit(prefix_name(name), fun, opts, 0) end @@ -85,11 +103,11 @@ defmodule ConcurrentLimiter do {:error, :overload} counter > max_running -> - wait(ref, name, wait, fun, opts, max_retries, retries + 1) + wait(ref, name, fun, wait, opts, retries + 1) end end - defp wait(ref, name, wait, fun, opts, max_retries, retries) do + defp wait(ref, name, fun, wait, opts, retries) do wait = Keyword.get(opts, :timeout) || wait Process.sleep(wait) dec(ref, name) @@ -112,5 +130,4 @@ defmodule ConcurrentLimiter do rescue _ -> false end - end |