PSPDFKit API provides a powerful HTTP API that allows users to generate, convert, and edit PDF documents, and we let our customers test the API for free. To prevent resource congestion and uncontrollable infrastructure fees, we decided to rate limit these test API calls to an acceptable number of requests per minute. This post provides some background on the rate limiting and explains our rate limiting implementation in our Elixir backend.
Motivation for Rate Limiting
PSPDFKit API’s pricing model works on the basis of prepaid credits — customers buy pack of credits or subscribe to receive a fixed amount of credits each month. Each API use then spends these prepaid credits.
We use API keys to authenticate API requests. Each customer account has two kinds of API keys associated with it:
-
A “live API key,” which consumes credits but has no limitations, so is usable for production.
-
A “test API key,” which is free but has multiple limitations — it produces watermarked documents, has a maximum size of 5 MB per generated document, and is rate limited to 10 requests per minute.
The test API keys allow our customers to test the API for free — for example, to demo its capabilities or for integration testing in their continuous integration environments. Since the test keys are free, we decided to employ rate limiting to protect the API from accidental or malicious overuse. An example of this is if a user makes a mistake and sends a request to PSPDFKit API in an infinite loop. This could quickly overload the service and lead to starving other customers.
Theoretical Background
We’ll start with some theory. As mentioned in the previous section, rate limiting is important because it preserves the quality of service without relying on users to adhere to “fair” use of the service. The importance of rate limiting is even higher for computationally intensive services that are deployed to the cloud and are employing a pay-by-the-usage scheme. To keep these services profitable (and thus alive), their infrastructure costs need to be predictable. Implementing a variant of rate limiting helps keep the usage within expected bounds.
There are multiple approaches to rate limiting, and the next section will look at a few of the most widely used rate limiting algorithms. Before we begin, I want to give a shoutout to KongHQ for a great blog post that introduced me to this topic.
Leaky Bucket
The base of the Leaky Bucket algorithm is a fixed queue that holds requests. New requests are appended to the end of the queue, and if the queue is full, the future requests are discarded. The first item in the queue is processed at a regular interval, therefore making room for future requests.
The benefit of this algorithm is that it processes requests at an approximately average rate, and it’s conceptually easy to grasp and easy to implement. In addition, in terms of memory efficiency, its memory usage is predictable, since the requests queue has a fixed size per user.
The main problem is that bursts of requests could fill the queue and prevent more requests from being processed until the old requests are processed and removed from the queue. Moreover, slow requests in the queue could lead to a slow response time for future requests.
Fixed Window
The fixed window algorithm is based on keeping a count of requests made by a user in a fixed time interval, i.e. a window. New requests increment this counter, and future requests are discarded if a usage threshold is hit.
This algorithm has low memory requirements: Only a count of requests per user and the timestamp of the last window they occurred in are required. A window is usually defined as a floor of the current timestamp. For example, if the window is 60-seconds wide, the request made at 12:10:30 would count for the 12:10 window. The stored count resets if the request happens in a future window.
The main issue with this algorithm is that bursts of traffic made near the window boundary can result in a much higher usage than the desired rate limit, since the requests falling to the current and the next window could be allowed. Another problem lies in the way the window boundary is calculated: If the same window boundaries are used for all users, they could all hit the API on the window boundaries where the usage count is reset, in turn producing a usage spike.
Sliding Log
The sliding log is the most precise rate limiting algorithm. The idea behind it is to keep a log with timestamps for each request. To determine whether a request should be limited, we need to count the logs and check if they’re over the desired rate limiting threshold.
The benefit of this algorithm is that the enforced rate limits are precise, since we can always calculate the most up-to-date request rate. The main negative is that it could be fairly expensive to store the logs for each historical request and calculate the number of recent requests. In addition, usage in a distributed environment requires transferring relatively large amounts of data between nodes to keep the log up to date.
Sliding Window
The sliding window algorithm is an extension of a fixed window algorithm that takes some inspiration from the sliding log algorithm to fix the window boundary issue of the original algorithm. As in the case of a fixed window, we keep count of the window usage. But instead of considering only the current window, we also use the value of the previous window. The usage is then calculated as the current window usage plus the weighted value of the previous window’s usage based on how much time already passed in the current window. For example, if 25 percent of the current window already elapsed, we use 75 percent of the previous window usage.
This algorithm provides a good compromise between the precise sliding log algorithm and the nice memory usage and performance characteristics of the fixed window — all while avoiding the usage spike issues of the latter.
Implementation
Due to benefits of the sliding window algorithm, we decided to use it for our rate limiting needs in PSPDFKit API. The rest of this post will guide you through the implementation of this algorithm in Elixir.
Rate Limiter Module
To encapsulate the rate limiting API and the actual rate limiting business logic, we’ll define a RateLimiter
module. This module will be responsible for:
-
Tracking new usage and deciding whether a certain request should be allowed if it falls under the rate limiting threshold.
-
Regularly pruning the unneeded usage data — i.e. removing data that’s older than the current and previous windows.
-
Syncing usage data between nodes in a cluster, since we’re running in a distributed environment.
To achieve all these requirements — especially the ability to schedule regular tasks for syncing and pruning data — and to receive sync requests from other nodes, we decided to make the module a GenServer
:
defmodule RateLimiter do use GenServer @doc """ Starts the RateLimiter GenServer process linked to the current process. """ def start_link() do GenServer.start_link(__MODULE__, %{rate_limit_settings: get_rate_limit_settings()}) end @impl true def init(%{rate_limit_settings: rate_limit_settings} = _init_args) do # Return the `GenServer` state. { :ok, %{ rate_limit_settings: rate_limit_settings } } end end
Our rate limiting implementation relies on multiple constants that can be tweaked based on the actual use case of the rate limiter. We extracted these settings to a separate function, get_rate_limit_settings
. You’ll probably want to have them in your application’s configuration, but for the sake of this post, we’ll hardcode test values in the function itself:
defp get_rate_limit_settings do %{ # Size of the rate limiting window in milliseconds. window_size_milliseconds: 60 * 1000, # Rate limit — i.e. the maximum number of requests allowed for the window. maximum_request_count: 10, # Interval to sync usage to other nodes in a cluster. sync_interval_milliseconds: 10 * 1000, # Interval for removing outdated data from the usage table. cleanup_interval_milliseconds: 120 * 1000 } end
We can now move on to the most important part of our implementation — designing an appropriate model for our usage data.
Data Model
The simplest solution for storing our usage data could be to store it in a database (we use Postgres as the data layer of our app). This has the benefit of being centralized, thus keeping the usage data up to date across the whole cluster. However, the rate limiting needs to be executed for each request, so this would introduce a database operation for each request made to our API endpoints.
To achieve the best performance and prevent possible database bottlenecks, we decided on an in-memory data model. This choice trades the strong consistency guarantees of the database approach with the eventual consistency of syncing the local usage data to other nodes in the cluster.
Elixir provides ETS — which is both powerful and efficient — as its memory storage. Be warned: ETS comes from Erlang, so you should prepare for an API that’s slightly different than what’s available in Elixir’s standard library.
One of its main benefits for our rate limiting use case is that it provides support for concurrent access from multiple processes. This means that API requests from different users can read and write from an ETS table at the same time with only a little synchronization overhead. This dramatically reduces the impact of users on each other compared to, for example, using a GenServer
for holding the rate limiting counters, where the requests would be dispatched one by one, causing a potential bottleneck.
ETS is based on tables where each table stores data as tuples. Instead of using raw tuples, we decided to use a Record
module to work with the data. Records are tuples, but they provide a convenient API for working with their fields. This makes for more readable code, since the tuple elements are named. Record
s still maintain the same performance characteristics of tuples, since their operations are translated to raw tuple operations at compile time (using macros).
Let’s define a record for our usage data:
require Record Record.defrecordp( # A unique name of the record. :usage, # Record key, a tuple of `{api_key :: String.t(), window_start_timestamp :: integer()}`. key: nil, # Usage count for the window — i.e. the number of requests made since the window started. usage: 0, # Usage count that isn't yet synced to other nodes in a cluster. pending_usage: 0 )
The ETS table needs to be created before its first use. We’ll do so while initializing our GenServer
:
# To keep things simple, we'll use the module name as the name of our ETS table. defp table_name(), do: __MODULE__ def init(%{rate_limit_settings: rate_limit_settings} = _init_args) do :ets.new( table_name(), [ # Our table is a set of values, i.e. each key in the table needs to be unique. :set, # We'll access this table from processes that handle our requests. # Setting it to ``:public`` makes it accessible from any process. :public, # Named tables can be accessed using their name. :named_table, # Inform the ETS about the index of our record keys. # Note that this ETS property is indexed from `1`: Erlang uses # `1`-based indexes for tuples, while Elixir tuples are `0`-based. keypos: usage(:key) + 1 ] ) # Return the `GenServer` state. { :ok, %{ rate_limit_settings: rate_limit_settings } } end
Uff, that was a mouthful. I think my small note about ETS coming from Erlang is now sinking in. :) But I promise this will be worth it in the end: ETS is battle tested and highly performant, and because of this, it’s a standard way of handling shared memory data in Elixir.
Rate Limiting
We can now sink our teeth into the actual implementation of our rate limiting algorithm. We’ll add a function to track the request and return :ok
for allowed requests and {:error, :limit_reached}
for those that should be limited:
@spec track(String.t()) :: :ok | {:error, :limit_reached} def track(api_key) do %{window_size_milliseconds: window_size, maximum_request_count: rate_limit} = get_rate_limit_settings() # ... end
As mentioned in the previous section, we’ll be updating the usage data in a process that handles the request, so as to prevent our GenServer
from becoming a bottleneck — i.e. directly in the track/1
function.
First, we need to update the usage counter for the current window. ETS ships with a convenient update_counter
method, which increments and gets the usage in one atomic operation:
def track(api_key) do # ... # Calculate the start of the current window. current_timestamp = now() current_window_start = window_start(current_timestamp, window_size) # And use it in the record key. record_key = {api_key, current_window_start} [current_window_usage, _current_window_pending_usage] = :ets.update_counter( table_name(), record_key, # Increment the `usage` and `pending_usage` values. # The syntax here expects the position in the record (again `1`-based). [{usage(:usage) + 1, 1}, {usage(:pending_usage) + 1, 1}], # We need to provide the default item in case there's # not yet a record for the current window. usage(key: record_key, usage: 0, pending_usage: 0) ) # The rest of the rate limiting logic. # ... end defp window_start(timestamp, window_size) do timestamp - rem(timestamp, window_size) end defp now do System.system_time(:millisecond) end
We’ll continue with implementing the sliding window algorithm:
def track(api_key) do #... # Calculate the start of the previous window. previous_window_start = current_window_start - window_size # Query the table for the previous window usage record. previous_window_usage = case :ets.lookup(table_name(), {key, previous_window_start}) do [] -> # No previous window usage record. 0 [usage(usage: previous_window_usage)] -> previous_window_usage end # Calculate how much of the current window already elapsed. window_elapsed_ratio = (current_timestamp - current_window_start) / window_size # Then calculate the usage with the sliding window algorithm formula. usage = current_window_usage + (1 - window_elapsed_ratio) * previous_window_usage if usage > rate_limit do # We already incremented the usage counters. We need to fix # the usage record since the request will be rate limited. :ets.update_counter( table_name(), record_key, [{usage(:usage) + 1, -1}, {usage(:pending_usage) + 1, -1}] ) {:error, :limit_reached} else # Request should be allowed. :ok end end
Our track/1
function is now fully functional and can track requests:
# Execute `maximum_request_count` in a loop to reach the requests limit. for _ <- 1..maximum_request_count do :ok = RateLimiter.track("test_api_key") end # The next request is limited, as expected. {:error, :limit_reached} = RateLimiter.track("test_api_key") # Requests with other API keys are still allowed. :ok = RateLimiter.track("test_api_key_2")
Next, we’ll do something with the outdated usage data.
Pruning the Outdated Usage Data
Our usage tracking implementation is still missing one important feature: We’re only adding data to the usage table, but we don’t remove it after it’s no longer needed. To prevent any possible future memory issues, we need to make sure to regularly prune this outdated data to keep the memory usage in check.
To do so, we’ll execute the cleanup in regularly spaced intervals. This can be done by sending a delayed message to our GenServer
. We’ll schedule the first message in the init/1
function:
def init(%{rate_limit_settings: rate_limit_settings} = _init_args) do # ... # Schedule outdated data cleanup. Scheduled regularly at data cleanup interval. Process.send_after(self(), :perform_cleanup, rate_limit_settings.cleanup_interval_milliseconds) # ... end
The :perform_cleanup
message will now be delivered to our GenServer
after the cleanup interval. We’ll handle this message in a handle_info/2
function:
def handle_info( :perform_cleanup, %{ rate_limit_settings: %{ window_size_milliseconds: window_size, cleanup_interval_milliseconds: cleanup_interval } } = state ) do # Clean up records with windows that are older than the previous window. previous_window_start = window_start(now(), window_size) - window_size # Atomically query and delete usage records that are older than the previous window. :ets.select_delete( table_name(), # :ets.fun2ms(fn {:usage, {_, timestamp}, _, _} when timestamp < previous_window_start -> true end) [ { {:usage, {:_, :"$1"}, :_, :_}, [{:<, :"$1", previous_window_start}], [true] } ] ) # Schedule the next cleanup cycle. Process.send_after(self(), :perform_cleanup, cleanup_interval) {:noreply, state} end
The most interesting part of this function is the usage of :ets.select_delete/2
, which matches the objects in an ETS table using a rather esoteric match specification syntax — if the match specification returns true
for an object, the object is removed from the table. Fortunately, we don’t need to write the match specifications manually. ETS ships with the :ets.fun2ms/1
function, which converts a valid function (i.e. one that can be converted into a match specification) into a corresponding match specification. It can be executed from an interactive Elixir shell. Note that it’s not possible to use variables in the :ets.fun2ms
function call, so we’ll use a hardcoded nonsensical timestamp value that will get replaced with the previous_window_start
in the final match specification:
iex(1)> :ets.fun2ms(fn {:usage, {_, timestamp}, _, _} when timestamp < -1000 -> true end) [{{:usage, {:_, :"$1"}, :_, :_}, [{:<, :"$1", -1000}], [true]}]
We’re nearly done.
Synchronizing the Usage Data across a Cluster
Our usage tracker is now fully functional in a context of a single node. Each node in our solution tracks the usage data locally. We could leave it at that, but this could lead to unpredictable rate limits if our API users end up using different nodes — each node would allow up to a maximum number of requests allowed by our rate limit, since it has no way of knowing the actual usage of other nodes.
To solve this issue, we decided to periodically sync local usage data to all nodes in a cluster. You probably noticed that we introduced a separate pending_usage
field in our usage record, and we increment it inside the track/1
function. We’re using this field to track the window usage that wasn’t yet synced to other nodes in a cluster.
We’ll use a Publisher-subscriber pattern to communicate between our nodes. Phoenix.PubSub
provides a ready-to-use implementation of this pattern. To use it, we need to first set it up by creating a named PubSub
instance in our supervision tree:
{Phoenix.PubSub, name: :pub_sub}
We’ll choose a PubSub
topic name for our usage tracker and subscribe our rate limiter GenServer
to it. And since we’re already modifying the init
function, let’s schedule a delayed message, :perform_sync
, to set up the first usage sync:
defmodule RateLimiter do # ... @pub_sub_sync_usage_topic "sync_usage_tracker_topic" def init(%{rate_limit_settings: rate_limit_settings} = _init_args) do # ... Phoenix.PubSub.subscribe(:pub_sub, @pub_sub_sync_usage_topic) # Schedule usage broadcast to keep other nodes up to date. Scheduled regularly at usage sync intervals. Process.send_after(self(), :perform_sync, rate_limit_settings.sync_interval_milliseconds) # ... end # ... end
Our GenServer
will collect any pending usage and push it to other nodes when handling the :perform_sync
message:
def handle_info( :perform_sync, %{ rate_limit_settings: %{ window_size_milliseconds: window_size, sync_interval_milliseconds: sync_interval } } = state ) do # We only care about current and previous windows. previous_window_start = window_start(now(), window_size) - window_size # Extract relevant records that have pending usage, since the previous window returns `[{{key, timestamp}, pending_usage}]`. pending_usage = :ets.select( table_name(), # :ets.fun2ms( # fn {:usage, {key, timestamp}, usage, pending_usage} # when timestamp >= previous_window_start and pending_usage > 0 -> # {{key, timestamp}, pending_usage} # end # ) [ { {:usage, {:"$1", :"$2"}, :_, :"$4"}, [{:andalso, {:>=, :"$2", previous_window_start}, {:>, :"$4", 0}}], [{{{{:"$1", :"$2"}}, :"$4"}}] } ] ) case pending_usage do [] -> # No pending usage; we're done. :ok pending_usage -> # Broadcast the pending usage to other nodes. Phoenix.PubSub.broadcast( :pub_sub, @pub_sub_sync_usage_topic, # Sync message payload that contains the pending usage and a PID of the sending process. {:sync, pending_usage, self()} ) # Decrement the pending usage table. Any remaining pending usage that could # have been tracked in the meantime will be synced in the next sync cycle. Enum.each( pending_usage, fn {key, pending_usage} -> :ets.update_counter( table(server_name), key, {usage(:pending_usage) + 1, -pending_usage} ) end ) end # Schedule next sync cycle. Process.send_after(self(), :perform_sync, sync_interval) {:noreply, state} end
And we’ll handle any sync notifications by handling the :sync
message:
# If the sync message originated from the current process, ignore it. def handle_info({:sync, _usage, from}, state) when from == self(), do: {:noreply, state} # Otherwise, update the usage table with received usage data. def handle_info( {:sync, usage, _from}, state ) do Enum.each( usage, fn {key, pending_usage} -> :ets.update_counter( table_name(), key, {usage(:usage) + 1, pending_usage}, usage(key: key, usage: 0, pending_usage: 0) ) end ) {:noreply, state} end
Now, let’s put it all together.
Putting It Together
To be able to use our RateLimiter
GenServer
, we need to put in our supervision tree. In this example, we’ll put it into our app’s supervision tree (together with the PubSub
instance mentioned in the previous section):
defmodule MyApp.Application do @moduledoc false use Application require Logger def start(_type, args) do children = [ # ... {Phoenix.PubSub, name: :pub_sub}, RateLimiter ] Supervisor.start_link(children, [strategy: :one_for_one, name: MyApp.Supervisor]) end end
The only task that remains is to build a module that will connect our requests with the RateLimiter
module. To make this idiomatic Elixir, we’ll use the Plug
behavior. This plug is going to be executed for each request. It’ll call a RateLimiter.track()
to track the usage, returning the original connection on an :ok
result, and responding with an error response when the request should be rate limited. We’ll return the standard HTTP status in this case:
defmodule ApiKeyRateLimiter do @behaviour Plug import Plug.Conn @impl true def init(options) do options end @impl true def call( %{ assigns: %{ api_key: test_api_key } } = conn, _opts ) do case RateLimiter.track(test_api_key) do # The request is allowed. :ok -> conn {:error, :limit_reached} -> # The request is rate limited. conn |> put_resp_content_type("application/json") |> send_resp( 429, Jason.encode!(%{ details: "Too many requests for the test API key.", status: 429 }) ) |> halt() end end end
Finally, we’ll set up this plug to be called for all of our endpoint calls by using it in our endpoint controller:
defmodule ApiController do # ... plug ApiKeyRateLimiter # ... end
That’s it!
Conclusion
This post covered various approaches to request rate limiting, and it explained our take on its implementation in detail. I hope you’ll find it interesting in case you’re building a similar functionality. I also believe that this post can function as an example of how GenServer
s simplify the implementation of distributed systems in Elixir.