Skip to content

Rate Limiter Reference

sqldeps.rate_limiter

Rate limiting utilities for API calls.

This module provides classes for limiting the rate of API calls to stay within provider limits, in both single-process and multi-process contexts.

MultiprocessingRateLimiter

A shared rate limiter for multiprocessing environments.

Uses a manager to share state between processes, ensuring all processes collectively respect the RPM limit.

Attributes:

Name Type Description
call_times

A shared list of API call timestamps

lock

A shared lock for thread-safe operations

rpm

Maximum requests per minute allowed

window

Time window in seconds

Source code in sqldeps/rate_limiter.py
class MultiprocessingRateLimiter:
    """A shared rate limiter for multiprocessing environments.

    Uses a manager to share state between processes, ensuring
    all processes collectively respect the RPM limit.

    Attributes:
        call_times: A shared list of API call timestamps
        lock: A shared lock for thread-safe operations
        rpm: Maximum requests per minute allowed
        window: Time window in seconds
    """

    def __init__(self, manager: SyncManager, rpm: int) -> None:
        """Initialize with a multiprocessing manager and RPM limit.

        Args:
            manager: A multiprocessing.Manager instance
            rpm: Maximum requests per minute allowed
        """
        self.call_times = manager.list()
        self.lock = manager.RLock()
        self.rpm = rpm
        self.window = 60

    def wait_if_needed(self) -> None:
        """Ensures calls don't exceed the rate limit across processes."""
        if self.rpm <= 0:
            return

        with self.lock:
            now = time.time()
            cutoff = now - self.window

            # Remove old timestamps
            while self.call_times and self.call_times[0] < cutoff:
                self.call_times.pop(0)

            # Wait if at RPM limit
            if len(self.call_times) >= self.rpm:
                wait_time = max(0, self.call_times[0] + self.window - now)
                if wait_time > 0:
                    logger.debug(f"Rate limit reached. Waiting {wait_time:.2f} seconds")
                    time.sleep(wait_time)

                    # Recalculate after waiting
                    now = time.time()
                    cutoff = now - self.window
                    while self.call_times and self.call_times[0] < cutoff:
                        self.call_times.pop(0)

            # Record this call
            self.call_times.append(now)

__init__(manager, rpm)

Initialize with a multiprocessing manager and RPM limit.

Parameters:

Name Type Description Default
manager SyncManager

A multiprocessing.Manager instance

required
rpm int

Maximum requests per minute allowed

required
Source code in sqldeps/rate_limiter.py
def __init__(self, manager: SyncManager, rpm: int) -> None:
    """Initialize with a multiprocessing manager and RPM limit.

    Args:
        manager: A multiprocessing.Manager instance
        rpm: Maximum requests per minute allowed
    """
    self.call_times = manager.list()
    self.lock = manager.RLock()
    self.rpm = rpm
    self.window = 60

wait_if_needed()

Ensures calls don't exceed the rate limit across processes.

Source code in sqldeps/rate_limiter.py
def wait_if_needed(self) -> None:
    """Ensures calls don't exceed the rate limit across processes."""
    if self.rpm <= 0:
        return

    with self.lock:
        now = time.time()
        cutoff = now - self.window

        # Remove old timestamps
        while self.call_times and self.call_times[0] < cutoff:
            self.call_times.pop(0)

        # Wait if at RPM limit
        if len(self.call_times) >= self.rpm:
            wait_time = max(0, self.call_times[0] + self.window - now)
            if wait_time > 0:
                logger.debug(f"Rate limit reached. Waiting {wait_time:.2f} seconds")
                time.sleep(wait_time)

                # Recalculate after waiting
                now = time.time()
                cutoff = now - self.window
                while self.call_times and self.call_times[0] < cutoff:
                    self.call_times.pop(0)

        # Record this call
        self.call_times.append(now)

RateLimiter

Rate limiter to prevent exceeding API rate limits.

Tracks API call timestamps and enforces waiting periods to respect the specified requests per minute (RPM) limit.

Attributes:

Name Type Description
rpm

Maximum requests per minute allowed

call_times

Deque storing timestamps of recent API calls

window

Time window in seconds (default: 60 seconds = 1 minute)

Source code in sqldeps/rate_limiter.py
class RateLimiter:
    """Rate limiter to prevent exceeding API rate limits.

    Tracks API call timestamps and enforces waiting periods
    to respect the specified requests per minute (RPM) limit.

    Attributes:
        rpm: Maximum requests per minute allowed
        call_times: Deque storing timestamps of recent API calls
        window: Time window in seconds (default: 60 seconds = 1 minute)
    """

    def __init__(self, rpm: int) -> None:
        """Initialize the rate limiter with an RPM limit.

        Args:
            rpm: Maximum number of API requests allowed per minute
        """
        self.rpm = rpm
        self.call_times = deque()
        self.window = 60  # 60 seconds =  1 minute window

    def wait_if_needed(self) -> None:
        """Ensures that calls do not exceed the rate limit.

        If the limit is reached, it waits until a slot is available.
        """
        if self.rpm <= 0:  # Disable rate limiting if rpm is 0
            return

        now = time.time()

        # Remove timestamps older than our time window (60 seconds)
        cutoff = now - self.window
        while self.call_times and self.call_times[0] < cutoff:
            self.call_times.popleft()

        # If we've reached the RPM limit, wait until the oldest timestamp expires
        if len(self.call_times) >= self.rpm:
            wait_time = max(0, self.call_times[0] + self.window - now)
            if wait_time > 0:
                logger.debug(f"Rate limit reached. Waiting {wait_time:.2f} seconds")
                time.sleep(wait_time)

                # After waiting, recalculate current time and clean up again
                now = time.time()
                cutoff = now - self.window
                while self.call_times and self.call_times[0] < cutoff:
                    self.call_times.popleft()

        # Record this API call's timestamp
        self.call_times.append(now)

__init__(rpm)

Initialize the rate limiter with an RPM limit.

Parameters:

Name Type Description Default
rpm int

Maximum number of API requests allowed per minute

required
Source code in sqldeps/rate_limiter.py
def __init__(self, rpm: int) -> None:
    """Initialize the rate limiter with an RPM limit.

    Args:
        rpm: Maximum number of API requests allowed per minute
    """
    self.rpm = rpm
    self.call_times = deque()
    self.window = 60  # 60 seconds =  1 minute window

wait_if_needed()

Ensures that calls do not exceed the rate limit.

If the limit is reached, it waits until a slot is available.

Source code in sqldeps/rate_limiter.py
def wait_if_needed(self) -> None:
    """Ensures that calls do not exceed the rate limit.

    If the limit is reached, it waits until a slot is available.
    """
    if self.rpm <= 0:  # Disable rate limiting if rpm is 0
        return

    now = time.time()

    # Remove timestamps older than our time window (60 seconds)
    cutoff = now - self.window
    while self.call_times and self.call_times[0] < cutoff:
        self.call_times.popleft()

    # If we've reached the RPM limit, wait until the oldest timestamp expires
    if len(self.call_times) >= self.rpm:
        wait_time = max(0, self.call_times[0] + self.window - now)
        if wait_time > 0:
            logger.debug(f"Rate limit reached. Waiting {wait_time:.2f} seconds")
            time.sleep(wait_time)

            # After waiting, recalculate current time and clean up again
            now = time.time()
            cutoff = now - self.window
            while self.call_times and self.call_times[0] < cutoff:
                self.call_times.popleft()

    # Record this API call's timestamp
    self.call_times.append(now)