Or press ESC to close.

Using Stress Tests to Catch Race Conditions in API Rate Limiting Logic

Jul 20th 2025 15 min read
medium
python3.13.5
performance
stress
api

As QA automation engineers, we often think of stress testing and functional testing as separate disciplines. Stress tests find performance bottlenecks, functional tests find bugs – or so we assume. But what if I told you that some of your most elusive functional bugs are hiding in plain sight, only revealing themselves under the concurrent load conditions that stress tests create?

Today, we'll explore a specific but common scenario where this happens: race conditions in API rate limiting logic. You'll learn how to design targeted stress tests that don't just measure performance, but actively hunt for concurrency bugs that traditional functional tests miss entirely.

The Hidden Problem with Rate Limiting

Rate limiting seems straightforward in theory. You track how many requests a client has made in a time window and reject requests that exceed your threshold. Simple, right?

The reality is more complex. Most rate limiting implementations involve multiple steps:

This check-then-act pattern creates a classic race condition window. When multiple requests arrive simultaneously, they might all read the same "current count" before any of them increment it, effectively bypassing your rate limit.

Here's the insidious part: this bug is nearly impossible to catch with sequential functional tests. Your test sends one request, waits for a response, sends another – everything works perfectly. The race condition only emerges when requests arrive concurrently, which rarely happens in traditional test suites but is guaranteed to happen in production.

A Real-World Example

Let's examine a simplified rate limiter that demonstrates this race condition. While this example uses an in-memory store, the same race condition pattern appears in Redis-based implementations, database-backed counters, and even some commercial API gateways.

The Test Server Setup

We'll create a Flask server that demonstrates both broken and fixed rate limiting implementations. This will give us a controlled environment to test race conditions.

Basic Flask Server Structure

First, let's set up the basic Flask application with imports and initialization:

                
from flask import Flask, request, jsonify
import time
from collections import defaultdict
from threading import Lock

app = Flask(__name__)
                
The Broken Rate Limiter Implementation

The broken rate limiter looks deceptively correct but has a critical race condition. The key issue is that it reads the current state, processes it, and then writes back - all without any synchronization between concurrent requests:

                
class BrokenRateLimiter:

    def __init__(self):
        self.requests = defaultdict(list)
        # Intentionally NOT using a lock to demonstrate race condition

    def is_allowed(self, client_id, limit=10, window_seconds=60):
        now = time.time()

        # RACE CONDITION: Multiple threads can read the same length
        # before any of them append their timestamp
        current_requests = [
            timestamp for timestamp in self.requests[client_id]
            if now - timestamp < window_seconds
        ]

        if len(current_requests) < limit:
            # Simulate some processing time to make race condition more likely
            time.sleep(0.001)
            self.requests[client_id] = current_requests + [now]
            return True

        return False
                

The time.sleep(0.001) artificially increases the window where the race condition can occur, making it easier to reproduce in testing.

The Fixed Rate Limiter Implementation

The fixed version solves the race condition by using a lock to ensure atomic operations. Only one thread can execute the critical section at a time:

                
class FixedRateLimiter:

    def __init__(self):
        self.requests = defaultdict(list)
        self.lock = Lock()

    def is_allowed(self, client_id, limit=10, window_seconds=60):
        with self.lock:  # This prevents race conditions
            now = time.time()

            # Clean old requests
            self.requests[client_id] = [
                timestamp for timestamp in self.requests[client_id]
                if now - timestamp < window_seconds
            ]

            # Check if under limit
            if len(self.requests[client_id]) < limit:
                self.requests[client_id].append(now)
                return True

            return False
                
API Endpoints

Now we'll create the Flask endpoints that use our rate limiter. The main endpoint applies rate limiting, while a status endpoint lets us inspect the current state:

                
# Choose which rate limiter to test
# Use BrokenRateLimiter to see the race condition
rate_limiter = BrokenRateLimiter()

@app.route('/api/data', methods=['GET'])
def get_data():
    client_id = request.headers.get('X-Client-ID', 'default')

    if rate_limiter.is_allowed(client_id, limit=10, window_seconds=60):
        return jsonify({
            "status": "success",
            "data": "Your API response data here",
            "timestamp": time.time(),
            "client_id": client_id
        }), 200
    else:
        return jsonify({
            "error": "Rate limit exceeded",
            "client_id": client_id,
            "timestamp": time.time()
        }), 429

@app.route('/api/status', methods=['GET'])
def get_status():
    """Endpoint to check current rate limit status"""
    client_id = request.headers.get('X-Client-ID', 'default')
    current_count = len([
        ts for ts in rate_limiter.requests[client_id]
        if time.time() - ts < 60
    ])

    return jsonify({
        "client_id": client_id,
        "current_requests_in_window": current_count,
        "limit": 10,
        "window_seconds": 60
    })
                
Server Startup

Finally, we configure the server to run with threading enabled, which is crucial for reproducing the race condition:

                
if __name__ == '__main__':    
    app.run(host='0.0.0.0', port=8000, threaded=True, debug=False)
                

Why This Breaks Under Load

The BrokenRateLimiter looks almost identical to a correct implementation, but has a critical flaw in the gap between reading and writing the request list. Here's what happens with concurrent requests:

The result? Instead of having 8 requests (5 + 3 new), we end up with only 6. The race condition causes lost updates and completely bypasses the rate limit.

In normal sequential testing, this rate limiter works perfectly because there's no concurrent access. The race condition only emerges when multiple requests arrive simultaneously, which is guaranteed in production but rare in traditional test suites.

The Stress Test That Exposes the Bug

Traditional functional tests send requests sequentially, which never triggers race conditions. We need a test that creates the exact concurrent conditions where these bugs hide.

Setting Up the Test Class

Our stress tester needs to handle concurrent HTTP requests and analyze the results for rate limiting violations:

                
import asyncio
import aiohttp
import time
from collections import defaultdict

class RateLimitStressTester:
    def __init__(self, base_url, endpoint="/api/data"):
        self.base_url = base_url
        self.endpoint = endpoint
        self.results = []
                
Creating Concurrent Request Batches

The key to exposing race conditions is sending requests that arrive at nearly the same time. We use asyncio to create tight concurrent batches:

                
async def send_concurrent_requests(self, client_id, num_requests=20,
                                   concurrent_batch_size=10):
    print(f"Sending {num_requests} requests for {client_id} in batches of {concurrent_batch_size}")

    async with aiohttp.ClientSession() as session:
        # Send requests in tight concurrent batches
        for batch_start in range(0, num_requests, concurrent_batch_size):
            batch_size = min(concurrent_batch_size,
                             num_requests - batch_start)

            print(f"  Sending batch {batch_start // concurrent_batch_size + 1}: {batch_size} concurrent requests")

            # Create all requests simultaneously
            tasks = []
            for i in range(batch_size):
                task = self.make_request(session, client_id,
                                         batch_start + i)
                tasks.append(task)

            # Execute them all at once
            batch_results = await asyncio.gather(*tasks)
            self.results.extend(batch_results)

            # Small delay between batches to ensure they're distinct
            await asyncio.sleep(0.05)
                
Making Individual Requests

Each request needs to capture timing and response data for later analysis:

                
async def make_request(self, session, client_id, request_num):
    headers = {"X-Client-ID": client_id}
    start_time = time.time()

    try:
        async with session.get(f"{self.base_url}{self.endpoint}",
                               headers=headers) as response:
            end_time = time.time()
            return {
                "request_num": request_num,
                "client_id": client_id,
                "status_code": response.status,
                "response_time": end_time - start_time,
                "timestamp": start_time
            }
    except Exception as e:
        return {
            "request_num": request_num,
            "client_id": client_id,
            "status_code": "ERROR",
            "error": str(e),
            "response_time": time.time() - start_time,
            "timestamp": start_time
        }
                
Analyzing Rate Limit Violations

This is where we detect the race conditions by examining successful request patterns and looking for violations of the expected rate limits:

                
def analyze_rate_limit_behavior(self, expected_limit=10, window_seconds=60):
    print(f"\nAnalyzing rate limit behavior...")
    print(f"Expected limit: {expected_limit} requests per {window_seconds} seconds")

    # Group by client and sort by timestamp
    clients = defaultdict(list)
    for result in self.results:
        clients[result["client_id"]].append(result)

    violations = []

    for client_id, requests in clients.items():
        requests.sort(key=lambda x: x["timestamp"])
        successful_requests = [r for r in requests if r["status_code"] == 200]

        print(f"\nClient {client_id}:")
        print(f"  Total requests: {len(requests)}")
        print(f"  Successful (200): {len(successful_requests)}")
        print(f"  Rate limited (429): {len([r for r in requests if r['status_code'] == 429])}")

        if successful_requests:
            # Check if we exceeded the limit in any 60-second window
            max_in_window = 0
            violation_found = False

            for i, request in enumerate(successful_requests):
                window_start = request["timestamp"]
                window_end = window_start + window_seconds

                # Count requests in this window
                window_requests = [
                    r for r in successful_requests
                    if window_start <= r["timestamp"] < window_end
                ]

                current_window_count = len(window_requests)
                max_in_window = max(max_in_window, current_window_count)

                # Only report one violation per client (the worst one)
                if current_window_count > expected_limit and not violation_found:
                    violations.append({
                        "client_id": client_id,
                        "window_start": window_start,
                        "expected_limit": expected_limit,
                        "actual_count": current_window_count,
                        "violation_amount": current_window_count - expected_limit
                    })
                    violation_found = True

            print(f"  Max requests in any {window_seconds}s window: {max_in_window}")
            if max_in_window > expected_limit:
                print(f"VIOLATION: Exceeded limit by {max_in_window - expected_limit}")
            else:
                print(f"No violations detected")

    return violations
                
Running the Complete Test

Finally, we orchestrate the entire test and provide comprehensive reporting:

                
def run_test(self, client_ids=["client1", "client2"], requests_per_client=15):
    print("=" * 60)
    print("RATE LIMIT RACE CONDITION STRESS TEST")
    print("=" * 60)
    print(f"Target: {self.base_url}{self.endpoint}")
    print(f"Clients: {client_ids}")
    print(f"Requests per client: {requests_per_client}")
    print(f"Expected rate limit: 10 requests/minute per client")
    print("-" * 60)

    async def run_all_clients():
        tasks = []
        for client_id in client_ids:
            task = self.send_concurrent_requests(client_id, requests_per_client,
                                                 concurrent_batch_size=8)
            tasks.append(task)
        await asyncio.gather(*tasks)

    # Run the test
    start_time = time.time()
    asyncio.run(run_all_clients())
    total_time = time.time() - start_time

    print(f"\nTest completed in {total_time:.2f} seconds")

    # Analyze results
    violations = self.analyze_rate_limit_behavior()

    # Print final summary
    successful_responses = len([r for r in self.results if r["status_code"] == 200])
    rate_limited_responses = len([r for r in self.results if r["status_code"] == 429])

    print("\n" + "=" * 60)
    print("FINAL RESULTS")
    print("=" * 60)
    print(f"Total requests sent: {len(self.results)}")
    print(f"Successful responses (200): {successful_responses}")
    print(f"Rate limited responses (429): {rate_limited_responses}")
    print(f"Test duration: {total_time:.2f} seconds")
    print(f"Rate limit violations: {len(violations)}")

    if violations:
        print("\nRACE CONDITION DETECTED!")
        print("The following violations indicate race conditions in the rate limiter:")
        for violation in violations:
            print(f"  - Client {violation['client_id']}: "
                  f"{violation['actual_count']} requests allowed "
                  f"(expected max: {violation['expected_limit']}, "
                  f"excess: {violation['violation_amount']})")
        print("\nThis means multiple requests bypassed the rate limit due to concurrent access!")
    else:
        print("\nNo rate limit violations detected.")
        print("The rate limiter appears to be working correctly under concurrent load.")

    return {
        "total_requests": len(self.results),
        "successful_requests": successful_responses,
        "rate_limited_requests": rate_limited_responses,
        "violations": violations,
        "test_duration": total_time
    }
                
Test Execution

The test script includes a main execution block that runs the stress test with default configuration:

                
if __name__ == "__main__":
    # Test configuration
    tester = RateLimitStressTester("http://localhost:8000")

    # Run the test
    results = tester.run_test(
        client_ids=["test_client_1", "test_client_2"],
        requests_per_client=15
    )

    print(f"\nTest complete! Check the results above.")
                

Running the Test

To see the race condition in action:

1. Start the broken server:

                
python test_server.py
                

2. Run the stress test in another terminal:

                
python stress_test_client.py
                

3. Observe the race condition: When you run this against the BrokenRateLimiter, you'll see results like:

RACE CONDITION DETECTED!
The following violations indicate race conditions in the rate limiter:
- Client test_client_1: 12 requests allowed (expected max: 10, excess: 2)
- Client test_client_2: 11 requests allowed (expected max: 10, excess: 1)

4. Test the fix: Change rate_limiter = BrokenRateLimiter() to rate_limiter = FixedRateLimiter() in the server code, restart, and run the test again. You should see:

No rate limit violations detected.
The rate limiter appears to be working correctly under concurrent load.

This stress test is specifically engineered to trigger race conditions by sending tightly grouped concurrent requests and then carefully analyzing the timing of successful responses to detect when the rate limiter failed to enforce its limits.

Integrating Into Your Test Suite

To make this technique part of your regular QA process:

Key Takeaways

Race conditions in rate limiting logic are just one example of bugs that hide until concurrent load exposes them. The same principle applies to:

The next time you're designing stress tests, don't just focus on performance metrics. Think about what concurrent access patterns might reveal functional bugs that your sequential tests miss. Sometimes the most valuable insight from a stress test isn't how fast your system runs – it's discovering that it doesn't run correctly under realistic load conditions.

By treating stress testing as a functional testing tool, you'll catch bugs before your users do, and you'll build more robust systems that perform correctly under the concurrent conditions they'll face in production.

Interested in the full code example? As always, it's available on our GitHub page.