Skip to content

Performance & Rate Limiting

Optimize your VBAPI integration with rate limiting strategies, performance monitoring, and scaling best practices for production deployments.

Performance Disclaimer

All code examples on this page are illustrative pseudo-code and are not intended for direct production use.


Understanding Rate Limits

VBAPI enforces throttling using a token bucket–style algorithm to ensure fair usage and system stability.

Default Limits

  • 25 requests per second sustained rate
  • 50 requests per second burst rate
  • Additional bursts allowed within internal smoothing windows
  • Per-client limits based on API key

When limits are exceeded, VBAPI returns 429 Too Many Requests.


Performance Optimization Strategies

Use Batch Operations

# Instead of multiple individual creates:
for county in counties:
    create_county(county)  # 10 requests

# Use batch create:
create_counties_batch(counties)  # 1 request

Implement Efficient Pagination

def get_all_members_efficiently(page_size=500):
    """Efficiently retrieve all members with optimal page size"""
    members = []
    page = 1
    
    while True:
        response = api_client.list_members(
            page=page, 
            pageSize=page_size,
        )
        
        batch = response['data']
        if not batch:
            break
            
        members.extend(batch)
        page += 1
        
        # Respect rate limits
        time.sleep(0.1)  # 10 requests per second
    
    return members

Rate Limiting Strategies

Adaptive Rate Limiting

import time
from collections import deque

class AdaptiveRateLimiter:
    """Smart rate limiter that adapts to API responses"""
    
    def __init__(self, initial_rate=20):
        self.current_rate = initial_rate  # requests per second
        self.request_times = deque()
        self.last_429 = None
        self.consecutive_success = 0
        
    def wait_if_needed(self):
        """Wait if necessary to respect current rate limit"""
        now = time.time()
        
        # Remove old request times (older than 1 second)
        while self.request_times and self.request_times[0] <= now - 1:
            self.request_times.popleft()
        
        # If we're at the limit, wait
        if len(self.request_times) >= self.current_rate:
            sleep_time = 1 - (now - self.request_times[0])
            if sleep_time > 0:
                time.sleep(sleep_time)
        
        self.request_times.append(time.time())
    
    def handle_429_response(self):
        """Reduce rate when we get 429"""
        self.last_429 = time.time()
        self.current_rate = max(1, int(self.current_rate * 0.5))
        self.consecutive_success = 0
        print(f"Rate limited! Reducing to {self.current_rate} req/sec")
    
    def handle_success_response(self):
        """Gradually increase rate on success"""
        self.consecutive_success += 1
        
        if (self.last_429 and 
            time.time() - self.last_429 > 60 and 
            self.consecutive_success >= 20):
            # Gradually increase rate after 1 minute of no 429s
            self.current_rate = min(25, self.current_rate + 1)
            self.last_429 = None
            self.consecutive_success = 0

# Usage
rate_limiter = AdaptiveRateLimiter()

def make_api_call(url, headers, data):
    rate_limiter.wait_if_needed()
    
    response = requests.post(url, headers=headers, json=data)
    
    if response.status_code == 429:
        rate_limiter.handle_429_response()
        raise RateLimitExceeded()
    else:
        rate_limiter.handle_success_response()
    
    return response

Parallel Processing with Rate Limiting

import asyncio
import aiohttp
from asyncio import Semaphore

class AsyncVBAPIClient:
    def __init__(self, max_concurrent=10, requests_per_second=20):
        self.semaphore = Semaphore(max_concurrent)
        self.rate_limiter = asyncio.create_task(self._rate_limiter(requests_per_second))
        self.request_queue = asyncio.Queue()
        
    async def _rate_limiter(self, rps):
        """Release requests at specified rate"""
        while True:
            await self.request_queue.put(None)
            await asyncio.sleep(1 / rps)
    
    async def make_request(self, method, url, **kwargs):
        async with self.semaphore:
            # Wait for rate limiter
            await self.request_queue.get()
            
            async with aiohttp.ClientSession() as session:
                async with session.request(method, url, **kwargs) as response:
                    return await response.json()

# Process many requests in parallel while respecting rate limits
async def process_claims_parallel(claim_ids):
    client = AsyncVBAPIClient(max_concurrent=5, requests_per_second=20)
    
    tasks = [
        client.make_request('GET', f'/claims/{claim_id}')
        for claim_id in claim_ids
    ]
    
    results = await asyncio.gather(*tasks, return_exceptions=True)
    return results

Token Bucket Rate Limiter

import time
import threading

class TokenBucketRateLimiter:
    """Token bucket rate limiter implementation"""
    
    def __init__(self, rate, capacity=None):
        self.rate = rate  # tokens per second
        self.capacity = capacity or rate * 2  # bucket size
        self.tokens = self.capacity
        self.last_update = time.time()
        self.lock = threading.Lock()
    
    def acquire(self, tokens=1):
        """Acquire tokens, blocking if necessary"""
        with self.lock:
            now = time.time()
            
            # Add tokens based on time elapsed
            elapsed = now - self.last_update
            self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
            self.last_update = now
            
            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            else:
                # Calculate wait time
                wait_time = (tokens - self.tokens) / self.rate
                return wait_time
    
    def wait_and_acquire(self, tokens=1):
        """Acquire tokens, waiting if necessary"""
        result = self.acquire(tokens)
        if result is not True:
            time.sleep(result)
            return self.acquire(tokens)
        return True

# Usage
bucket = TokenBucketRateLimiter(rate=25, capacity=50)

def rate_limited_request(url, headers, data):
    bucket.wait_and_acquire()  # Wait if necessary
    return requests.post(url, headers=headers, json=data)

Increasing Rate Limits

Clients may request a Service Level Review via VBA Support for increased rate limits.

Approval Factors:

  • Expected production load - Documented traffic projections
  • Integration design - Efficient use of batch operations and pagination
  • Test results - Performance testing in lower environments
  • Business justification - Critical business processes requiring higher throughput

Request Process:

  1. Document current usage patterns and bottlenecks
  2. Provide load testing results from development/test environments
  3. Detail optimization efforts (batching, caching, etc.)
  4. Submit request to VBA Support with business justification

Performance Checklist

Pre-Production

  • Load testing completed with realistic data volumes
  • Rate limiting strategies implemented and tested
  • Caching strategy implemented for frequently accessed data
  • Error handling and retry logic tested

Production Deployment

  • Performance baselines established
  • Resource limits appropriate for expected load

Ongoing Optimization

  • Regular performance reviews scheduled
  • Rate limit utilization tracked
  • API usage patterns analyzed for improvements

This comprehensive performance guide ensures your VBAPI integration scales efficiently and maintains optimal performance under production loads.